فهرست منبع

Initial commit

jherve 1 ماه پیش
کامیت
cf5967074b
10فایلهای تغییر یافته به همراه509 افزوده شده و 0 حذف شده
  1. 5 0
      .gitignore
  2. 9 0
      README.md
  3. 28 0
      pc_backup/__init__.py
  4. 128 0
      pc_backup/cli.py
  5. 31 0
      pc_backup/config/borgmatic/common.yaml
  6. 161 0
      pc_backup/container.py
  7. 14 0
      pc_backup/env.py
  8. 38 0
      pc_backup/keepass.py
  9. 81 0
      pc_backup/secret.py
  10. 14 0
      pyproject.toml

+ 5 - 0
.gitignore

@@ -0,0 +1,5 @@
+pc_backup/config/.bash_history
+data_sources_*
+secret_sources_*
+*.egg-info/
+__pycache__

+ 9 - 0
README.md

@@ -0,0 +1,9 @@
+## Repository creation
+
+1. `borgmatic repo-create --encryption repokey`
+1. `borgmatic key export`
+
+Note : This can also be done this way :
+
+1. `BORG_PASSPHRASE_NAME=<passphrase_name> STORAGE_BOX_USER=<user> SSH_KEY_NAME=<name> python3 start.py create_repo`
+1. `BORG_PASSPHRASE_NAME=<passphrase_name> STORAGE_BOX_USER=<user> SSH_KEY_NAME=<name> python3 start.py export_key`

+ 28 - 0
pc_backup/__init__.py

@@ -0,0 +1,28 @@
+from pc_backup.container import BorgmaticContainer, Configuration
+from pc_backup.cli import CliArguments
+from pc_backup.env import getlogin, gethostname
+
+
+def main():
+    login = getlogin()
+    hostname = gethostname()
+
+    config = Configuration.read(hostname, login, Configuration.get_config_dir())
+
+    if not config.secret_sources:
+        print("no secret required ?")
+
+    container = BorgmaticContainer.new(hostname, login)
+
+    parser = CliArguments.new()
+    command = CliArguments.read_command(parser)
+    command.run(
+        config=config,
+        secret_sources=config.secret_sources,
+        data_sources=config.data_sources,
+        container=container,
+    )
+
+
+if __name__ == "__main__":
+    main()

+ 128 - 0
pc_backup/cli.py

@@ -0,0 +1,128 @@
+import argparse
+from argparse import Namespace
+import sys
+from pathlib import Path
+
+from pc_backup.container import BorgmaticContainer, Configuration
+from pc_backup.keepass import KeePass
+from pc_backup.secret import Secret
+
+
+class CliArguments:
+    @staticmethod
+    def read_command(parser):
+        match parser.parse_known_args():
+            case (Namespace(command=command) as args, extra_args) if (
+                command == CommandExec
+            ):
+                args.extra = extra_args
+                return command(args)
+            case (Namespace(command=_) as args, []):
+                return args.command(args)
+            case (Namespace(command=_), extra_args):
+                return parser.error("You passed extra arguments")
+            case _:
+                return parser.error("You should call at least one of the commands")
+
+    @staticmethod
+    def new() -> argparse.ArgumentParser:
+        parser = argparse.ArgumentParser(prog=sys.argv[0])
+        subparsers = parser.add_subparsers()
+
+        for sub in [
+            CommandStart,
+            CommandRm,
+            CommandExec,
+            CommandBash,
+            CommandCreateRepo,
+            CommandExportKey,
+            CommandCreateSecrets,
+        ]:
+            p = subparsers.add_parser(sub.command, help=sub.help)
+            sub.init_subparser(p)
+            p.set_defaults(command=sub)
+
+        return parser
+
+
+class Command:
+    def __init__(self, namespace) -> None:
+        for k, v in vars(namespace).items():
+            if k != "type_":
+                setattr(self, k, v)
+
+    @classmethod
+    def init_subparser(cls, p): ...
+
+
+class CommandStart(Command):
+    command = "start"
+    help = "start container"
+
+    def run(
+        self,
+        *,
+        container: BorgmaticContainer,
+        config: Configuration,
+        **kwargs,
+    ):
+        container.run(config)
+
+
+class CommandRm(Command):
+    command = "rm"
+    help = "remove container"
+
+    def run(self, *, container: BorgmaticContainer, **kwargs):
+        container.rm()
+
+
+class CommandExec(Command):
+    command = "exec"
+    help = "run command in container"
+    env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
+
+    def run(self, *, container: BorgmaticContainer, **kwargs):
+        container.exec(self.extra, self.env_vars)
+
+
+class CommandBash(Command):
+    command = "bash"
+    help = "run shell in container"
+
+    def run(self, *, container: BorgmaticContainer, **kwargs):
+        container.exec(["bash"])
+
+
+class CommandCreateRepo(Command):
+    command = "create_repo"
+    help = "create repository"
+    env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
+
+    def run(self, *, container: BorgmaticContainer, **kwargs):
+        container.exec(
+            ["borgmatic", "repo-create", "--encryption", "repokey"], self.env_vars
+        )
+
+
+class CommandExportKey(Command):
+    command = "export_key"
+    help = "export the repository key"
+    env_vars = ["BORG_PASSPHRASE_NAME", "STORAGE_BOX_USER", "SSH_KEY_NAME"]
+
+    def run(self, *, container: BorgmaticContainer, **kwargs):
+        container.exec(["borgmatic", "export", "key"], self.env_vars)
+
+
+class CommandCreateSecrets(Command):
+    command = "create_secrets"
+    help = "create podman secrets"
+
+    def run(self, *, secret_sources: list[Secret], **kwargs):
+        keepass = KeePass.new(self.keepass_path)
+        for s in secret_sources:
+            s.create(keepass)
+
+    @classmethod
+    def init_subparser(cls, p):
+        p.add_argument("keepass_path", type=Path, help="Path to the keepass")

+ 31 - 0
pc_backup/config/borgmatic/common.yaml

@@ -0,0 +1,31 @@
+repositories:
+  - path: ssh://${STORAGE_BOX_USER}@${STORAGE_BOX_USER}.your-storagebox.de:23/./borg-repo
+    label: hetzner
+
+remote_path: borg-1.4
+ssh_command: ssh -i /var/run/secrets/${SSH_KEY_NAME}
+log_file: /root/.local/share/borgmatic/log
+
+encryption_passphrase: "{credential container ${BORG_PASSPHRASE_NAME}}"
+compression: lz4
+
+checks:
+  - name: repository
+    frequency: 2 weeks
+  - name: archives
+    frequency: always
+  - name: extract
+    frequency: 2 weeks
+  - name: data
+    frequency: 1 month
+
+commands:
+  - before: everything
+    run:
+      - echo "Starting a backup job."
+  - after: everything
+    run:
+      - echo "Backup created."
+  - after: error
+    run:
+      - echo "Error while creating a backup."

+ 161 - 0
pc_backup/container.py

@@ -0,0 +1,161 @@
+import os
+import subprocess
+from pathlib import Path, PurePosixPath
+from dataclasses import dataclass
+
+from pc_backup.secret import Secret
+from pc_backup.env import is_windows
+
+
+def read_data_sources(file: Path) -> list[Path]:
+    with open(file) as f:
+        paths = f.readlines()
+        return [Path(p_str.strip()).expanduser() for p_str in paths]
+
+
+@dataclass
+class Configuration:
+    secret_sources: list[Secret]
+    data_sources: list[Path]
+    borgmatic_d_path: Path
+    borgmatic_path: Path
+    history_file: Path
+    ssh_auth_sock: Path | None
+
+    @staticmethod
+    def get_config_dir() -> Path:
+        if is_windows():
+            program_data = Path(os.getenv("ProgramData"))
+            return program_data / "pc_backup"
+        else:
+            return Path.home() / ".config" / "pc_backup"
+
+    @classmethod
+    def read(cls, hostname: str, login: str, config_dir: Path):
+        # The configuration directory has to be organized like this :
+        #
+        # .
+        # ├── borgmatic
+        # │   └── common.yaml
+        # ├── <host1>
+        # │   ├── <user1>
+        # │   │   ├── borgmatic.d
+        # │   │   │   ├── <config1>.yaml
+        # │   │   │   ├── <config2>.yaml
+        # │   │   │   └── windows.yaml
+        # │   │   ├── data_sources
+        # │   │   └── secret_sources
+        # │   └── <user2>
+        # │       ├── borgmatic.d
+        # │       │   ├── ...
+        # │       ├── data_sources
+        # │       └── secret_sources
+        # └── <host2>
+        #     ├── <user1>
+        #     │   ├── borgmatic.d
+        #     │   │   ├── ...
+        #     │   ├── data_sources
+        #     │   └── secret_sources
+        #     └── <user2>
+        #         ├── borgmatic.d
+        #         │   ├── ...
+        #         ├── data_sources
+        #         └── secret_sources
+        specific_config_dir = config_dir / hostname / login
+
+        secret_sources_file = specific_config_dir / "secret_sources"
+        data_sources_file = specific_config_dir / "data_sources"
+        ssh_auth_sock = os.getenv("SSH_AUTH_SOCK")
+
+        return cls(
+            secret_sources=Secret.read_sources(secret_sources_file),
+            data_sources=read_data_sources(data_sources_file),
+            borgmatic_d_path=specific_config_dir / "borgmatic.d",
+            borgmatic_path=config_dir / "borgmatic",
+            history_file=specific_config_dir / ".bash_history",
+            ssh_auth_sock=Path(ssh_auth_sock) if ssh_auth_sock else None,
+        )
+
+
+@dataclass
+class BorgmaticContainer:
+    hostname: str
+    login: str
+    name: str
+    image: str = "ghcr.io/borgmatic-collective/borgmatic"
+
+    def run(self, config: Configuration):
+        container_name = f"borgmatic_{self.login}"
+
+        config.history_file.touch()
+        volumes = [
+            f"{config.borgmatic_d_path}:/etc/borgmatic.d/",
+            f"{config.borgmatic_path}:/etc/borgmatic/",
+            f"{config.history_file}:/root/.bash_history",
+            "borg_ssh_dir:/root/.ssh",
+            "borg_config:/root/.config/borg",
+            "borg_cache:/root/.cache/borg",
+            "borgmatic_state:/root/.local/state/borgmatic",
+            "borgmatic_log:/root/.local/share/borgmatic",
+        ]
+        if config.ssh_auth_sock:
+            volumes += [f"{config.ssh_auth_sock}:{config.ssh_auth_sock}:Z"]
+
+        volumes += [
+            f"{vol}:{self.to_source_path(vol)}:ro" for vol in config.data_sources
+        ]
+
+        volume_args = [a for vol in volumes for a in ["-v", vol]]
+
+        secrets_args = [
+            a
+            for s in config.secret_sources
+            for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]
+        ]
+
+        args = (
+            [
+                "podman",
+                "run",
+                "-h",
+                self.hostname,
+                "--detach",
+                "--name",
+                container_name,
+                "-e",
+                "SSH_AUTH_SOCK",
+                "-e",
+                "TZ=Europe/Paris",
+                "-e",
+                "SSH_KEY_NAME",
+                "-e",
+                f"HOST_LOGIN={self.login}",
+                "--security-opt=label=disable",
+            ]
+            + volume_args
+            + secrets_args
+            + [self.image]
+        )
+        print(" ".join(args))
+        subprocess.run(args)
+
+    def rm(self):
+        subprocess.run(["podman", "rm", "-f", self.name])
+
+    def exec(self, cmd: list[str], env_vars: list[str] = []):
+        args = ["podman", "exec", "-ti"]
+        args += [a for var in env_vars for a in ["-e", var]]
+        subprocess.run(args + [self.name] + cmd)
+
+    @staticmethod
+    def to_source_path(path: Path):
+        mount_base = PurePosixPath("/mnt") / "source"
+        inner_path = PurePosixPath(path)
+        with_drive = PurePosixPath(inner_path.parts[0].replace(":", "")).joinpath(
+            *inner_path.parts[1:]
+        )
+        return mount_base / with_drive.relative_to(with_drive.anchor)
+
+    @classmethod
+    def new(cls, hostname: str, login: str):
+        return cls(hostname, login, f"borgmatic_{login}")

+ 14 - 0
pc_backup/env.py

@@ -0,0 +1,14 @@
+import os
+import socket
+
+
+def getlogin() -> str:
+    return os.getlogin()
+
+
+def gethostname() -> str:
+    return socket.gethostname()
+
+
+def is_windows() -> bool:
+    return os.name == "nt"

+ 38 - 0
pc_backup/keepass.py

@@ -0,0 +1,38 @@
+import subprocess
+from pathlib import Path
+from dataclasses import dataclass
+from typing import Any
+
+from pc_backup.env import is_windows
+
+
+@dataclass
+class KeePass:
+    path: Path
+    bin: str | Path
+
+    def read_entry_attribute(self, key, attribute):
+        print(f"reading attr {attribute} of key {key}..")
+        return self._exec(["show", "-a", attribute, self.path, key]).strip()
+
+    def read_entry_attachment(self, key, attachment):
+        print(f"reading attachment {attachment} of key {key}..")
+        return self._exec(
+            ["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"]
+        )
+
+    def _exec(self, args: list[Any]):
+        try:
+            return subprocess.check_output([self.bin] + args, text=True)
+        except subprocess.CalledProcessError as e:
+            print("\nThere was an error on call to keepass, please check the outout")
+            exit(1)
+
+    @classmethod
+    def new(cls, path: Path):
+        binary = (
+            Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe"
+            if is_windows()
+            else "keepassxc-cli"
+        )
+        return cls(path=path, bin=binary)

+ 81 - 0
pc_backup/secret.py

@@ -0,0 +1,81 @@
+import subprocess
+from pathlib import Path
+from dataclasses import dataclass
+
+from pc_backup.keepass import KeePass
+
+
+@dataclass
+class Secret:
+    name: str
+    mode: int
+
+    def create(self, keepass: KeePass): ...
+
+    @classmethod
+    def from_line(cls, line: str):
+        name, type_, *args = line.split(",")
+        match type_:
+            case "file":
+                sub_class = SecretFile
+            case "keepass-attribute":
+                sub_class = SecretKeepassAttribute
+            case "keepass-attachment":
+                sub_class = SecretKeepassAttachment
+            case _:
+                raise ValueError(f"Cannot read `{line}` as a secret spec")
+
+        return sub_class.from_line(name, *args)
+
+    @classmethod
+    def read_sources(cls, file: Path) -> list["Secret"]:
+        with open(file) as f:
+            lines = f.readlines()
+            return [cls.from_line(l.strip()) for l in lines]
+
+
+@dataclass
+class SecretKeepassAttachment(Secret):
+    key: str
+    attachment: str
+
+    def create(self, keepass: KeePass):
+        value = keepass.read_entry_attachment(self.key, self.attachment)
+        args = ["podman", "secret", "create", "--replace", self.name, "-"]
+        print(args)
+        subprocess.run(args, input=value.encode())
+
+    @classmethod
+    def from_line(cls, name: str, key: str, attachment: str):
+        return cls(name=name, key=key, mode=0o0400, attachment=attachment)
+
+
+@dataclass
+class SecretKeepassAttribute(Secret):
+    key: str
+    attribute: str
+
+    def create(self, keepass: KeePass):
+        value = keepass.read_entry_attribute(self.key, self.attribute)
+        args = ["podman", "secret", "create", "--replace", self.name, "-"]
+        print(args)
+        subprocess.run(args, input=value.encode())
+
+    @classmethod
+    def from_line(cls, name: str, key: str, attribute: str):
+        return cls(name=name, key=key, mode=0o0400, attribute=attribute)
+
+
+@dataclass
+class SecretFile(Secret):
+    host_path: Path
+
+    def create(self, keepass: KeePass):
+        args = ["podman", "secret", "create", "--replace", self.name, self.host_path]
+        print(args)
+        subprocess.run(args)
+
+    @classmethod
+    def from_line(cls, name: str, path: str):
+        path = Path(path).expanduser()
+        return cls(host_path=path, name=name, mode=0o0400)

+ 14 - 0
pyproject.toml

@@ -0,0 +1,14 @@
+[project]
+name = "pc_backup"
+version = "0.3.0"
+description = "My backup solution"
+readme = "README.md"
+requires-python = ">=3.9"
+dependencies = []
+
+[project.scripts]
+my-backup = "pc_backup:main"
+
+[build-system]
+requires = ["setuptools>=61.0"]
+build-backend = "setuptools.build_meta"