|
|
@@ -1,3 +1,4 @@
|
|
|
+import argparse
|
|
|
import os
|
|
|
import sys
|
|
|
import subprocess
|
|
|
@@ -25,7 +26,9 @@ class KeePass:
|
|
|
return self._exec(["show", "-a", attribute, self.path, key]).strip()
|
|
|
|
|
|
def read_entry_attachment(self, key, attachment):
|
|
|
- return self._exec(["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"])
|
|
|
+ return self._exec(
|
|
|
+ ["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"]
|
|
|
+ )
|
|
|
|
|
|
def _exec(self, args: list[Any]):
|
|
|
try:
|
|
|
@@ -36,7 +39,11 @@ class KeePass:
|
|
|
|
|
|
@classmethod
|
|
|
def new(cls, path: Path):
|
|
|
- binary = Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe" if is_windows else "keepassxc-cli"
|
|
|
+ binary = (
|
|
|
+ Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe"
|
|
|
+ if is_windows
|
|
|
+ else "keepassxc-cli"
|
|
|
+ )
|
|
|
return cls(path=path, bin=binary)
|
|
|
|
|
|
|
|
|
@@ -45,8 +52,7 @@ class Secret:
|
|
|
name: str
|
|
|
mode: int
|
|
|
|
|
|
- def create(self, keepass: KeePass):
|
|
|
- ...
|
|
|
+ def create(self, keepass: KeePass): ...
|
|
|
|
|
|
@classmethod
|
|
|
def from_line(cls, line: str):
|
|
|
@@ -66,6 +72,7 @@ class Secret:
|
|
|
lines = f.readlines()
|
|
|
return [cls.from_line(l.strip()) for l in lines]
|
|
|
|
|
|
+
|
|
|
@dataclass
|
|
|
class SecretKeepassAttachment(Secret):
|
|
|
key: str
|
|
|
@@ -110,14 +117,7 @@ class SecretFile(Secret):
|
|
|
@classmethod
|
|
|
def from_line(cls, path: str):
|
|
|
path = Path(path).expanduser()
|
|
|
- return cls(host_path=path, name=path.name, mode=0o0400)
|
|
|
-
|
|
|
-
|
|
|
-def to_source_path(path: Path):
|
|
|
- mount_base = PurePosixPath("/mnt") / "source"
|
|
|
- inner_path = PurePosixPath(path)
|
|
|
- with_drive = PurePosixPath(inner_path.parts[0].replace(":", "")).joinpath(*inner_path.parts[1:])
|
|
|
- return mount_base / with_drive.relative_to(with_drive.anchor)
|
|
|
+ return cls(host_path=path, name=path.name, mode=0o0400)
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
@@ -148,32 +148,39 @@ class BorgmaticContainer:
|
|
|
if ssh_auth_sock:
|
|
|
volumes += [f"{ssh_auth_sock}:{ssh_auth_sock}:Z"]
|
|
|
|
|
|
- volumes += [
|
|
|
- f"{vol}:{to_source_path(vol)}:ro" for vol in data_sources
|
|
|
- ]
|
|
|
+ volumes += [f"{vol}:{self.to_source_path(vol)}:ro" for vol in data_sources]
|
|
|
|
|
|
volume_args = [a for vol in volumes for a in ["-v", vol]]
|
|
|
|
|
|
- secrets_args = [a for s in secret_sources for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]]
|
|
|
-
|
|
|
- args = [
|
|
|
- "podman",
|
|
|
- "run",
|
|
|
- "-h",
|
|
|
- self.hostname,
|
|
|
- "--detach",
|
|
|
- "--name",
|
|
|
- container_name,
|
|
|
- "-e",
|
|
|
- "SSH_AUTH_SOCK",
|
|
|
- "-e",
|
|
|
- "TZ=Europe/Paris",
|
|
|
- "-e",
|
|
|
- "SSH_KEY_NAME",
|
|
|
- "-e",
|
|
|
- f"HOST_LOGIN={self.login}",
|
|
|
- "--security-opt=label=disable"
|
|
|
- ] + volume_args + secrets_args + [self.image]
|
|
|
+ secrets_args = [
|
|
|
+ a
|
|
|
+ for s in secret_sources
|
|
|
+ for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]
|
|
|
+ ]
|
|
|
+
|
|
|
+ args = (
|
|
|
+ [
|
|
|
+ "podman",
|
|
|
+ "run",
|
|
|
+ "-h",
|
|
|
+ self.hostname,
|
|
|
+ "--detach",
|
|
|
+ "--name",
|
|
|
+ container_name,
|
|
|
+ "-e",
|
|
|
+ "SSH_AUTH_SOCK",
|
|
|
+ "-e",
|
|
|
+ "TZ=Europe/Paris",
|
|
|
+ "-e",
|
|
|
+ "SSH_KEY_NAME",
|
|
|
+ "-e",
|
|
|
+ f"HOST_LOGIN={self.login}",
|
|
|
+ "--security-opt=label=disable",
|
|
|
+ ]
|
|
|
+ + volume_args
|
|
|
+ + secrets_args
|
|
|
+ + [self.image]
|
|
|
+ )
|
|
|
print(args)
|
|
|
subprocess.run(args)
|
|
|
|
|
|
@@ -185,11 +192,121 @@ class BorgmaticContainer:
|
|
|
args += [a for var in env_vars for a in ["-e", var]]
|
|
|
subprocess.run(args + [self.name] + cmd)
|
|
|
|
|
|
+ @staticmethod
|
|
|
+ def to_source_path(path: Path):
|
|
|
+ mount_base = PurePosixPath("/mnt") / "source"
|
|
|
+ inner_path = PurePosixPath(path)
|
|
|
+ with_drive = PurePosixPath(inner_path.parts[0].replace(":", "")).joinpath(
|
|
|
+ *inner_path.parts[1:]
|
|
|
+ )
|
|
|
+ return mount_base / with_drive.relative_to(with_drive.anchor)
|
|
|
+
|
|
|
@classmethod
|
|
|
def new(cls, hostname: str, login: str):
|
|
|
return cls(hostname, login, f"borgmatic_{login}")
|
|
|
|
|
|
|
|
|
+class CliArguments:
|
|
|
+ @staticmethod
|
|
|
+ def read_command(parser):
|
|
|
+ args = parser.parse_args()
|
|
|
+ return args.command(args)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def new() -> argparse.ArgumentParser:
|
|
|
+ parser = argparse.ArgumentParser(prog=sys.argv[0])
|
|
|
+ subparsers = parser.add_subparsers()
|
|
|
+
|
|
|
+ for sub in [
|
|
|
+ CommandStart,
|
|
|
+ CommandRm,
|
|
|
+ CommandBash,
|
|
|
+ CommandCreateRepo,
|
|
|
+ CommandExportKey,
|
|
|
+ CommandCreateSecrets,
|
|
|
+ ]:
|
|
|
+ p = subparsers.add_parser(sub.command, help=sub.help)
|
|
|
+ sub.init_subparser(p)
|
|
|
+ p.set_defaults(command=sub)
|
|
|
+
|
|
|
+ return parser
|
|
|
+
|
|
|
+
|
|
|
+class Command:
|
|
|
+ def __init__(self, namespace) -> None:
|
|
|
+ for k, v in vars(namespace).items():
|
|
|
+ if k != "type_":
|
|
|
+ setattr(self, k, v)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def init_subparser(cls, p): ...
|
|
|
+
|
|
|
+
|
|
|
+class CommandStart(Command):
|
|
|
+ command = "start"
|
|
|
+ help = "start container"
|
|
|
+
|
|
|
+ def run(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ container: BorgmaticContainer,
|
|
|
+ data_sources: list[Path],
|
|
|
+ secret_sources: list[Secret],
|
|
|
+ **kwargs,
|
|
|
+ ):
|
|
|
+ container.run(data_sources, secret_sources)
|
|
|
+
|
|
|
+
|
|
|
+class CommandRm(Command):
|
|
|
+ command = "rm"
|
|
|
+ help = "remove container"
|
|
|
+
|
|
|
+ def run(self, *, container: BorgmaticContainer, **kwargs):
|
|
|
+ container.rm()
|
|
|
+
|
|
|
+
|
|
|
+class CommandBash(Command):
|
|
|
+ command = "bash"
|
|
|
+ help = "run shell in container"
|
|
|
+
|
|
|
+ def run(self, *, container: BorgmaticContainer, **kwargs):
|
|
|
+ container.exec(["bash"])
|
|
|
+
|
|
|
+
|
|
|
+class CommandCreateRepo(Command):
|
|
|
+ command = "create_repo"
|
|
|
+ help = "create repository"
|
|
|
+ env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
|
|
|
+
|
|
|
+ def run(self, *, container: BorgmaticContainer, **kwargs):
|
|
|
+ container.exec(
|
|
|
+ ["borgmatic", "repo-create", "--encryption", "repokey"], self.env_vars
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+class CommandExportKey(Command):
|
|
|
+ command = "export_key"
|
|
|
+ help = "export the repository key"
|
|
|
+ env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
|
|
|
+
|
|
|
+ def run(self, *, container: BorgmaticContainer, **kwargs):
|
|
|
+ container.exec(["borgmatic", "export", "key"], self.env_vars)
|
|
|
+
|
|
|
+
|
|
|
+class CommandCreateSecrets(Command):
|
|
|
+ command = "create_secrets"
|
|
|
+ help = "create podman secrets"
|
|
|
+
|
|
|
+ def run(self, *, secret_sources: list[Secret], **kwargs):
|
|
|
+ keepass = KeePass.new(self.keepass_path)
|
|
|
+ for s in secret_sources:
|
|
|
+ s.create(keepass)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def init_subparser(cls, p):
|
|
|
+ p.add_argument("keepass_path", type=Path, help="Path to the keepass")
|
|
|
+
|
|
|
+
|
|
|
def main():
|
|
|
login = os.getlogin()
|
|
|
hostname = socket.gethostname()
|
|
|
@@ -201,33 +318,14 @@ def main():
|
|
|
print("no secret required ?")
|
|
|
|
|
|
container = BorgmaticContainer.new(hostname, login)
|
|
|
- env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
|
|
|
-
|
|
|
- try:
|
|
|
- if sys.argv[1] == "create_secrets":
|
|
|
- keepass_path = Path(sys.argv[2])
|
|
|
- keepass = KeePass.new(keepass_path)
|
|
|
- for s in secret_sources:
|
|
|
- s.create(keepass)
|
|
|
-
|
|
|
- elif sys.argv[1] == "start":
|
|
|
- container.run(data_sources, secret_sources)
|
|
|
-
|
|
|
- elif sys.argv[1] == "rm":
|
|
|
- container.rm()
|
|
|
-
|
|
|
- elif sys.argv[1] == "bash":
|
|
|
- container.exec(["bash"])
|
|
|
-
|
|
|
- elif sys.argv[1] == "create_repo":
|
|
|
- container.exec(["borgmatic", "repo-create", "--encryption", "repokey"], env_vars)
|
|
|
-
|
|
|
- elif sys.argv[1] == "export_key":
|
|
|
- container.exec(["borgmatic", "export", "key"], env_vars)
|
|
|
|
|
|
- except IndexError:
|
|
|
- print("You should provide an argument")
|
|
|
- exit(1)
|
|
|
+ parser = CliArguments.new()
|
|
|
+ command = CliArguments.read_command(parser)
|
|
|
+ command.run(
|
|
|
+ secret_sources=secret_sources,
|
|
|
+ data_sources=data_sources,
|
|
|
+ container=container,
|
|
|
+ )
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|