podman.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import subprocess
  2. from pathlib import Path
  3. class Podman:
  4. @classmethod
  5. def run(
  6. cls,
  7. image: str,
  8. name: str,
  9. *,
  10. hostname: str,
  11. env: list,
  12. volumes: list[str],
  13. # TODO: Actually a list of Secret but creates a circular dependency
  14. secrets: list,
  15. ssh_auth_sock: Path | None = None,
  16. detach=True,
  17. ):
  18. args = ["run"]
  19. args += ["-h", hostname]
  20. args += ["--name", name]
  21. for e in env:
  22. args += ["-e", e]
  23. if ssh_auth_sock:
  24. args += ["-e", "SSH_AUTH_SOCK"]
  25. volumes += [f"{ssh_auth_sock}:{ssh_auth_sock}:Z"]
  26. args += ["--security-opt=label=disable"]
  27. if detach:
  28. args += ["--detach"]
  29. args += [a for vol in volumes for a in ["-v", vol]]
  30. args += [a for s in secrets for a in ["--secret", f"{s.name},mode=0{s.mode:o}"]]
  31. args += [image]
  32. cls._call(args)
  33. @classmethod
  34. def rm(cls, name, *, force=True):
  35. args = ["rm"]
  36. if force:
  37. args += ["-f"]
  38. args += [name]
  39. cls._call(args)
  40. @classmethod
  41. def exec(cls, name, *cmd):
  42. args = ["exec", "-ti", name] + list(cmd)
  43. cls._call(args)
  44. @classmethod
  45. def secret_create(
  46. cls,
  47. name: str,
  48. *,
  49. replace=True,
  50. value: bytes | None = None,
  51. host_path: Path | None = None,
  52. ):
  53. if value and host_path:
  54. raise ValueError("both value and host_path can not be set")
  55. args = ["secret", "create"]
  56. kwargs = {}
  57. if replace:
  58. args += ["--replace"]
  59. args += [name]
  60. if value is not None:
  61. args += ["-"]
  62. kwargs["input"] = value
  63. elif host_path is not None:
  64. args += [host_path]
  65. cls._call(args, **kwargs)
  66. @classmethod
  67. def _call(cls, args: list, **kwargs):
  68. args = ["podman"] + args
  69. print(f"Executing `{" ".join(args)}`")
  70. subprocess.run(args, **kwargs)