6 Komitmen a1e32e39b6 ... 89ef30b89b

Pembuat SHA1 Pesan Tanggal
  jherve 89ef30b89b Use inherited classes 1 bulan lalu
  jherve 7e7d17caa0 Allow to read attachment as well 1 bulan lalu
  jherve ff9d23c86f Allow to read secrets from keepass 1 bulan lalu
  jherve fecaa85204 Add new commands 1 bulan lalu
  jherve bd2255e19b Add argument to cli 1 bulan lalu
  jherve e7d86c79a2 A bit of refactor 1 bulan lalu
1 mengubah file dengan 121 tambahan dan 20 penghapusan
  1. 121 20
      start.py

+ 121 - 20
start.py

@@ -1,8 +1,10 @@
 import os
+import sys
 import subprocess
 import socket
 from pathlib import Path, PurePosixPath
 from dataclasses import dataclass
+from typing import Any
 
 is_windows = os.name == "nt"
 
@@ -14,23 +16,101 @@ def read_data_sources(hostname: str, login: str) -> list[Path]:
         return [Path(p_str.strip()).expanduser() for p_str in paths]
 
 
+@dataclass
+class KeePass:
+    path: Path
+    bin: str | Path
+
+    def read_entry_attribute(self, key, attribute):
+        return self._exec(["show", "-a", attribute, self.path, key]).strip()
+
+    def read_entry_attachment(self, key, attachment):
+        return self._exec(["attachment-export", "--stdout", self.path, key, attachment, "/dev/null"])
+
+    def _exec(self, args: list[Any]):
+        try:
+            return subprocess.check_output([self.bin] + args, text=True)
+        except subprocess.CalledProcessError as e:
+            print("\nThere was an error on call to keepass, please check the outout")
+            exit(1)
+
+    @classmethod
+    def new(cls, path: Path):
+        binary = Path("C:\\") / "Program Files" / "KeePassXC" / "keepassxc-cli.exe" if is_windows else "keepassxc-cli"
+        return cls(path=path, bin=binary)
+
+
 @dataclass
 class Secret:
-    host_path: Path
     name: str
     mode: int
 
+    def create(self, keepass: KeePass):
+        ...
+
     @classmethod
     def from_line(cls, line: str):
-        path = Path(line).expanduser()
-        return cls(host_path=path, name=path.name, mode=0o0400)
+        type_, *args = line.split(",")
+        match type_:
+            case "file":
+                return SecretFile.from_line(*args)
+            case "keepass-attribute":
+                return SecretKeepassAttribute.from_line(*args)
+            case "keepass-attachment":
+                return SecretKeepassAttachment.from_line(*args)
 
+    @classmethod
+    def read_sources(cls, hostname: str, login: str) -> list["Secret"]:
+        file = Path(f"./secret_sources_{hostname}_{login}")
+        with open(file) as f:
+            lines = f.readlines()
+            return [cls.from_line(l.strip()) for l in lines]
 
-def read_secret_sources(hostname: str, login: str) -> list[Secret]:
-    file = Path(f"./secret_sources_{hostname}_{login}")
-    with open(file) as f:
-        lines = f.readlines()
-        return [Secret.from_line(l.strip()) for l in lines]
+@dataclass
+class SecretKeepassAttachment(Secret):
+    key: str
+    attachment: str
+
+    def create(self, keepass: KeePass):
+        value = keepass.read_entry_attachment(self.key, self.attachment)
+        args = ["podman", "secret", "create", "--replace", self.name, "-"]
+        print(args)
+        subprocess.run(args, input=value.encode())
+
+    @classmethod
+    def from_line(cls, key, attachment):
+        return cls(name=key, key=key, mode=0o0400, attachment=attachment)
+
+
+@dataclass
+class SecretKeepassAttribute(Secret):
+    key: str
+    attribute: str
+
+    def create(self, keepass: KeePass):
+        value = keepass.read_entry_attribute(self.key, self.attribute)
+        args = ["podman", "secret", "create", "--replace", self.name, "-"]
+        print(args)
+        subprocess.run(args, input=value.encode())
+
+    @classmethod
+    def from_line(cls, key, attribute):
+        return cls(name=key, key=key, mode=0o0400, attribute=attribute)
+
+
+@dataclass
+class SecretFile(Secret):
+    host_path: Path
+
+    def create(self, keepass: KeePass):
+        args = ["podman", "secret", "create", "--replace", self.name, self.host_path]
+        print(args)
+        subprocess.run(args)
+
+    @classmethod
+    def from_line(cls, path: str):
+        path = Path(path).expanduser()
+        return cls(host_path=path, name=path.name, mode=0o0400) 
 
 
 def to_source_path(path: Path):
@@ -40,11 +120,8 @@ def to_source_path(path: Path):
     return mount_base / with_drive.relative_to(with_drive.anchor)
 
 
-def main():
-    login = os.getlogin()
-    hostname = socket.gethostname()
+def start_borgmatic_container(hostname: str, login: str, secret_sources: list[Secret]):
     data_sources = read_data_sources(hostname, login)
-    secret_sources = read_secret_sources(hostname, login)
     container_name = f"borgmatic_{login}"
     ssh_auth_sock = os.getenv("SSH_AUTH_SOCK")
 
@@ -72,14 +149,6 @@ def main():
 
     volume_args = [a for vol in volumes for a in ["-v", vol]]
 
-    if not secret_sources:
-        print("no secret required ?")
-
-    for s in secret_sources:
-        args = ["podman", "secret", "create", "--replace", s.name, s.host_path]
-        print(args)
-        subprocess.run(args)
-
     secrets_args = [a for s in secret_sources for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]]
     image_name = "ghcr.io/borgmatic-collective/borgmatic"
 
@@ -104,5 +173,37 @@ def main():
     print(args)
     subprocess.run(args)
 
+
+def main():
+    login = os.getlogin()
+    hostname = socket.gethostname()
+
+    secret_sources = Secret.read_sources(hostname, login)
+
+    print(secret_sources)
+    if not secret_sources:
+        print("no secret required ?")
+
+    try:
+        if sys.argv[1] == "create_secrets":
+            keepass_path = Path(sys.argv[2])
+            keepass = KeePass.new(keepass_path)
+            for s in secret_sources:
+                s.create(keepass)
+
+        elif sys.argv[1] == "start":
+            start_borgmatic_container(hostname, login, secret_sources)
+
+        elif sys.argv[1] == "rm":
+            subprocess.run(["podman", "rm", "-f", f"borgmatic_{login}"])
+
+        elif sys.argv[1] == "bash":
+            subprocess.run(["podman", "exec", "-ti", f"borgmatic_{login}", "bash"])
+
+    except IndexError:
+        print("You should provide an argument")
+        exit(1)
+
+
 if __name__ == "__main__":
     main()