Coverage for b4_backup/main/dataclass.py: 100%
85 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-18 22:40 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-18 22:40 +0000
1from collections.abc import Generator, Iterable
2from dataclasses import dataclass, field
3from os import PathLike
4from pathlib import PurePath, PurePosixPath
5from typing import TYPE_CHECKING
7from b4_backup import exceptions
8from b4_backup.config_schema import DEFAULT, BackupTarget
10if TYPE_CHECKING: # pragma: no cover
11 from b4_backup.main.connection import Connection
14class BackupHostPath(PurePosixPath):
15 """Represents a path for a Connection."""
17 def __init__(self, *segments: str | PathLike[str], connection: "Connection"):
18 """
19 Args:
20 segments: segments of the path
21 connection: The connection used for this path.
22 """
23 super().__init__(*segments)
24 self.connection = connection
26 def with_segments(self, *segments): # noqa: D102 (Built in)
27 return type(self)(*segments, connection=self.connection)
29 def rmdir(self) -> None:
30 """Removes the given empty directory."""
31 try:
32 self.connection.run_process(["rmdir", str(self)])
33 except exceptions.FailedProcessError as e:
34 if "No such file or directory" not in e.stderr:
35 raise
37 def exists(self) -> bool:
38 """
39 Returns:
40 True if the location exists.
41 """
42 try:
43 result = self.connection.run_process(["ls", "-d", str(self)])
44 except exceptions.FailedProcessError as e:
45 if "No such file or directory" in e.stderr:
46 return False
48 raise
50 return result.strip() != ""
52 def mkdir(self, parents: bool = False) -> None:
53 """
54 Creates a directory.
56 Args:
57 parents: Also creates parent directories and doesn't fail if path exist.
58 """
59 self.connection.run_process(["mkdir", str(self)] + ["-p"] * parents)
61 def rename(self, target: PurePath) -> None:
62 """
63 Renames/Moves the path to the target location.
65 Args:
66 target: The target location to move the object to.
67 """
68 self.connection.run_process(["mv", str(self), str(target)])
70 def iterdir(self) -> list["BackupHostPath"]:
71 """
72 Returns:
73 A list of Paths containing all items in the current directory.
74 """
75 result = sorted(self.connection.run_process(["ls", str(self)]).strip().split("\n"))
77 if result == [""]:
78 return []
80 return [self / x for x in result]
82 def is_dir(self) -> bool:
83 """Checks if a path is a directory."""
84 result = self.connection.run_process(["ls", "-dl", str(self)])
85 return result.strip()[0] == "d"
88@dataclass(frozen=True)
89class Snapshot:
90 """Describes a b4_snapshot."""
92 name: str
93 subvolumes: list[BackupHostPath]
94 base_path: BackupHostPath
96 _subvolume_delimiter: str = "!"
98 @classmethod
99 def from_new(
100 cls, name: str, subvolumes: list[BackupHostPath], base_path: BackupHostPath
101 ) -> "Snapshot":
102 """
103 Create instance from the backup target location.
105 Args:
106 name: Name of the snapshot
107 subvolumes: List of subvolumes without delimiter translation
108 base_path: Location of this snapshot
109 """
110 return Snapshot(
111 name=name,
112 subvolumes=[cls.escape_path(x) for x in subvolumes],
113 base_path=base_path,
114 )
116 @classmethod
117 def escape_path(cls, path: BackupHostPath) -> BackupHostPath:
118 """
119 Returns:
120 Escaped variant of subvolume path.
121 """
122 return BackupHostPath(
123 str(path).replace("/", cls._subvolume_delimiter),
124 connection=path.connection,
125 )
127 @classmethod
128 def unescape_path(cls, path: BackupHostPath) -> BackupHostPath:
129 """
130 Returns:
131 Recreates a path from an escaped variant of subvolume path.
132 """
133 return BackupHostPath(
134 str(path).replace(cls._subvolume_delimiter, "/"),
135 connection=path.connection,
136 )
138 @property
139 def subvolumes_unescaped(self) -> Generator[BackupHostPath, None, None]:
140 """
141 Returns:
142 List all subvolumes without delimiter translation as relative paths.
143 """
144 return (
145 self.unescape_path(
146 BackupHostPath(
147 str(x).lstrip("!"),
148 connection=x.connection,
149 )
150 )
151 for x in self.subvolumes
152 )
155@dataclass
156class RetentionGroup:
157 """
158 Contains the retention ruleset for a target.
160 Attributes:
161 name: Name of the retention ruleset
162 target_retention: The retention ruleset for the target itself
163 is_source: True if this is a source retention ruleset
164 obsolete_snapshots: All snapshots in this set will be condidered obsolete
165 """
167 name: str
168 target_retention: dict[str, str]
169 is_source: bool = True
170 obsolete_snapshots: set[str] = field(default_factory=set)
172 @classmethod
173 def from_target(
174 cls,
175 retention_name: str,
176 target: BackupTarget,
177 is_source: bool = True,
178 obsolete_snapshots: set[str] | None = None,
179 ) -> "RetentionGroup":
180 """
181 Create an instance from a target and ruleset name.
183 Args:
184 retention_name: Name of the retention ruleset to select from the target
185 target: Target to get the ruleset from
186 is_source: Select source ruleset or destination ruleset
187 obsolete_snapshots: All snapshots in this set will be condidered obsolete
189 Returns:
190 RetentionGroup instance
191 """
192 target_retentions = target.src_retention if is_source else target.dst_retention
193 target_retention = target_retentions.get(retention_name) or target_retentions[DEFAULT]
195 return RetentionGroup(
196 name=retention_name,
197 target_retention=target_retention,
198 is_source=is_source,
199 obsolete_snapshots=obsolete_snapshots or set(),
200 )
203@dataclass(frozen=True)
204class ChoiceSelector:
205 """
206 Describes a set of data, with dynamic choices.
208 Attributes:
209 data: Contains the actual data
210 """
212 data: list[str] = field(default_factory=list)
214 def resolve_target(self, targets: Iterable[str]) -> list[str]:
215 """
216 Resolves a target selector and returns a list based on the selection.
218 Returns:
219 List of resolved items
220 """
221 expanded_data: set[str] = set()
222 for item in self.data:
223 if item in targets:
224 expanded_data.add(item)
225 continue
227 for target_name in targets:
228 if PurePath(target_name).is_relative_to(item):
229 expanded_data.add(target_name)
231 return list(expanded_data - {"_default"})
233 def resolve_retention_name(self, snapshot_names: Iterable[str]) -> list[str]:
234 """
235 Resolves a retention_name selector and returns a list based on the selection.
237 Returns:
238 List of resolved items
239 """
240 if self.data == ["ALL"]:
241 return list({x.split("_", maxsplit=1)[1] for x in snapshot_names})
243 return self.data