|
@@ -1,8 +1,10 @@
|
|
|
import os
|
|
import os
|
|
|
|
|
+import sys
|
|
|
import subprocess
|
|
import subprocess
|
|
|
import socket
|
|
import socket
|
|
|
from pathlib import Path, PurePosixPath
|
|
from pathlib import Path, PurePosixPath
|
|
|
from dataclasses import dataclass
|
|
from dataclasses import dataclass
|
|
|
|
|
+from typing import Any
|
|
|
|
|
|
|
|
is_windows = os.name == "nt"
|
|
is_windows = os.name == "nt"
|
|
|
|
|
|
|
@@ -14,23 +16,101 @@ def read_data_sources(hostname: str, login: str) -> list[Path]:
|
|
|
return [Path(p_str.strip()).expanduser() for p_str in paths]
|
|
return [Path(p_str.strip()).expanduser() for p_str in paths]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class KeePass:
|
|
|
|
|
+ path: Path
|
|
|
|
|
+ bin: str | Path
|
|
|
|
|
+
|
|
|
|
|
+ def read_entry_attribute(self, key, attribute):
|
|
|
|
|
+ return self._exec(["show", "-a", attribute, self.path, key]).strip()
|
|
|
|
|
+
|
|
|
|
|
+ def read_entry_attachment(self, key, attachment):
|
|
|
|
|
+ return self._exec(["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"])
|
|
|
|
|
+
|
|
|
|
|
+ def _exec(self, args: list[Any]):
|
|
|
|
|
+ try:
|
|
|
|
|
+ return subprocess.check_output([self.bin] + args, text=True)
|
|
|
|
|
+ except subprocess.CalledProcessError as e:
|
|
|
|
|
+ print("\nThere was an error on call to keepass, please check the outout")
|
|
|
|
|
+ exit(1)
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def new(cls, path: Path):
|
|
|
|
|
+ binary = Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe" if is_windows else "keepassxc-cli"
|
|
|
|
|
+ return cls(path=path, bin=binary)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
@dataclass
|
|
@dataclass
|
|
|
class Secret:
|
|
class Secret:
|
|
|
- host_path: Path
|
|
|
|
|
name: str
|
|
name: str
|
|
|
mode: int
|
|
mode: int
|
|
|
|
|
|
|
|
|
|
+ def create(self, keepass: KeePass):
|
|
|
|
|
+ ...
|
|
|
|
|
+
|
|
|
@classmethod
|
|
@classmethod
|
|
|
def from_line(cls, line: str):
|
|
def from_line(cls, line: str):
|
|
|
- path = Path(line).expanduser()
|
|
|
|
|
- return cls(host_path=path, name=path.name, mode=0o0400)
|
|
|
|
|
|
|
+ type_, *args = line.split(",")
|
|
|
|
|
+ match type_:
|
|
|
|
|
+ case "file":
|
|
|
|
|
+ return SecretFile.from_line(*args)
|
|
|
|
|
+ case "keepass-attribute":
|
|
|
|
|
+ return SecretKeepassAttribute.from_line(*args)
|
|
|
|
|
+ case "keepass-attachment":
|
|
|
|
|
+ return SecretKeepassAttachment.from_line(*args)
|
|
|
|
|
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def read_sources(cls, hostname: str, login: str) -> list["Secret"]:
|
|
|
|
|
+ file = Path(f"./secret_sources_{hostname}_{login}")
|
|
|
|
|
+ with open(file) as f:
|
|
|
|
|
+ lines = f.readlines()
|
|
|
|
|
+ return [cls.from_line(l.strip()) for l in lines]
|
|
|
|
|
|
|
|
-def read_secret_sources(hostname: str, login: str) -> list[Secret]:
|
|
|
|
|
- file = Path(f"./secret_sources_{hostname}_{login}")
|
|
|
|
|
- with open(file) as f:
|
|
|
|
|
- lines = f.readlines()
|
|
|
|
|
- return [Secret.from_line(l.strip()) for l in lines]
|
|
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class SecretKeepassAttachment(Secret):
|
|
|
|
|
+ key: str
|
|
|
|
|
+ attachment: str
|
|
|
|
|
+
|
|
|
|
|
+ def create(self, keepass: KeePass):
|
|
|
|
|
+ value = keepass.read_entry_attachment(self.key, self.attachment)
|
|
|
|
|
+ args = ["podman", "secret", "create", "--replace", self.name, "-"]
|
|
|
|
|
+ print(args)
|
|
|
|
|
+ subprocess.run(args, input=value.encode())
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def from_line(cls, key, attachment):
|
|
|
|
|
+ return cls(name=key, key=key, mode=0o0400, attachment=attachment)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class SecretKeepassAttribute(Secret):
|
|
|
|
|
+ key: str
|
|
|
|
|
+ attribute: str
|
|
|
|
|
+
|
|
|
|
|
+ def create(self, keepass: KeePass):
|
|
|
|
|
+ value = keepass.read_entry_attribute(self.key, self.attribute)
|
|
|
|
|
+ args = ["podman", "secret", "create", "--replace", self.name, "-"]
|
|
|
|
|
+ print(args)
|
|
|
|
|
+ subprocess.run(args, input=value.encode())
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def from_line(cls, key, attribute):
|
|
|
|
|
+ return cls(name=key, key=key, mode=0o0400, attribute=attribute)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@dataclass
|
|
|
|
|
+class SecretFile(Secret):
|
|
|
|
|
+ host_path: Path
|
|
|
|
|
+
|
|
|
|
|
+ def create(self, keepass: KeePass):
|
|
|
|
|
+ args = ["podman", "secret", "create", "--replace", self.name, self.host_path]
|
|
|
|
|
+ print(args)
|
|
|
|
|
+ subprocess.run(args)
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def from_line(cls, path: str):
|
|
|
|
|
+ path = Path(path).expanduser()
|
|
|
|
|
+ return cls(host_path=path, name=path.name, mode=0o0400)
|
|
|
|
|
|
|
|
|
|
|
|
|
def to_source_path(path: Path):
|
|
def to_source_path(path: Path):
|
|
@@ -40,11 +120,8 @@ def to_source_path(path: Path):
|
|
|
return mount_base / with_drive.relative_to(with_drive.anchor)
|
|
return mount_base / with_drive.relative_to(with_drive.anchor)
|
|
|
|
|
|
|
|
|
|
|
|
|
-def main():
|
|
|
|
|
- login = os.getlogin()
|
|
|
|
|
- hostname = socket.gethostname()
|
|
|
|
|
|
|
+def start_borgmatic_container(hostname: str, login: str, secret_sources: list[Secret]):
|
|
|
data_sources = read_data_sources(hostname, login)
|
|
data_sources = read_data_sources(hostname, login)
|
|
|
- secret_sources = read_secret_sources(hostname, login)
|
|
|
|
|
container_name = f"borgmatic_{login}"
|
|
container_name = f"borgmatic_{login}"
|
|
|
ssh_auth_sock = os.getenv("SSH_AUTH_SOCK")
|
|
ssh_auth_sock = os.getenv("SSH_AUTH_SOCK")
|
|
|
|
|
|
|
@@ -72,14 +149,6 @@ def main():
|
|
|
|
|
|
|
|
volume_args = [a for vol in volumes for a in ["-v", vol]]
|
|
volume_args = [a for vol in volumes for a in ["-v", vol]]
|
|
|
|
|
|
|
|
- if not secret_sources:
|
|
|
|
|
- print("no secret required ?")
|
|
|
|
|
-
|
|
|
|
|
- for s in secret_sources:
|
|
|
|
|
- args = ["podman", "secret", "create", "--replace", s.name, s.host_path]
|
|
|
|
|
- print(args)
|
|
|
|
|
- subprocess.run(args)
|
|
|
|
|
-
|
|
|
|
|
secrets_args = [a for s in secret_sources for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]]
|
|
secrets_args = [a for s in secret_sources for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]]
|
|
|
image_name = "ghcr.io/borgmatic-collective/borgmatic"
|
|
image_name = "ghcr.io/borgmatic-collective/borgmatic"
|
|
|
|
|
|
|
@@ -104,5 +173,37 @@ def main():
|
|
|
print(args)
|
|
print(args)
|
|
|
subprocess.run(args)
|
|
subprocess.run(args)
|
|
|
|
|
|
|
|
|
|
+
|
|
|
|
|
+def main():
|
|
|
|
|
+ login = os.getlogin()
|
|
|
|
|
+ hostname = socket.gethostname()
|
|
|
|
|
+
|
|
|
|
|
+ secret_sources = Secret.read_sources(hostname, login)
|
|
|
|
|
+
|
|
|
|
|
+ print(secret_sources)
|
|
|
|
|
+ if not secret_sources:
|
|
|
|
|
+ print("no secret required ?")
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ if sys.argv[1] == "create_secrets":
|
|
|
|
|
+ keepass_path = Path(sys.argv[2])
|
|
|
|
|
+ keepass = KeePass.new(keepass_path)
|
|
|
|
|
+ for s in secret_sources:
|
|
|
|
|
+ s.create(keepass)
|
|
|
|
|
+
|
|
|
|
|
+ elif sys.argv[1] == "start":
|
|
|
|
|
+ start_borgmatic_container(hostname, login, secret_sources)
|
|
|
|
|
+
|
|
|
|
|
+ elif sys.argv[1] == "rm":
|
|
|
|
|
+ subprocess.run(["podman", "rm", "-f", f"borgmatic_{login}"])
|
|
|
|
|
+
|
|
|
|
|
+ elif sys.argv[1] == "bash":
|
|
|
|
|
+ subprocess.run(["podman", "exec", "-ti", f"borgmatic_{login}", "bash"])
|
|
|
|
|
+
|
|
|
|
|
+ except IndexError:
|
|
|
|
|
+ print("You should provide an argument")
|
|
|
|
|
+ exit(1)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
main()
|