Coverage for b4_backup/main/dataclass.py: 100%

85 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-18 22:40 +0000

1from collections.abc import Generator, Iterable 

2from dataclasses import dataclass, field 

3from os import PathLike 

4from pathlib import PurePath, PurePosixPath 

5from typing import TYPE_CHECKING 

6 

7from b4_backup import exceptions 

8from b4_backup.config_schema import DEFAULT, BackupTarget 

9 

10if TYPE_CHECKING: # pragma: no cover 

11 from b4_backup.main.connection import Connection 

12 

13 

14class BackupHostPath(PurePosixPath): 

15 """Represents a path for a Connection.""" 

16 

17 def __init__(self, *segments: str | PathLike[str], connection: "Connection"): 

18 """ 

19 Args: 

20 segments: segments of the path 

21 connection: The connection used for this path. 

22 """ 

23 super().__init__(*segments) 

24 self.connection = connection 

25 

26 def with_segments(self, *segments): # noqa: D102 (Built in) 

27 return type(self)(*segments, connection=self.connection) 

28 

29 def rmdir(self) -> None: 

30 """Removes the given empty directory.""" 

31 try: 

32 self.connection.run_process(["rmdir", str(self)]) 

33 except exceptions.FailedProcessError as e: 

34 if "No such file or directory" not in e.stderr: 

35 raise 

36 

37 def exists(self) -> bool: 

38 """ 

39 Returns: 

40 True if the location exists. 

41 """ 

42 try: 

43 result = self.connection.run_process(["ls", "-d", str(self)]) 

44 except exceptions.FailedProcessError as e: 

45 if "No such file or directory" in e.stderr: 

46 return False 

47 

48 raise 

49 

50 return result.strip() != "" 

51 

52 def mkdir(self, parents: bool = False) -> None: 

53 """ 

54 Creates a directory. 

55 

56 Args: 

57 parents: Also creates parent directories and doesn't fail if path exist. 

58 """ 

59 self.connection.run_process(["mkdir", str(self)] + ["-p"] * parents) 

60 

61 def rename(self, target: PurePath) -> None: 

62 """ 

63 Renames/Moves the path to the target location. 

64 

65 Args: 

66 target: The target location to move the object to. 

67 """ 

68 self.connection.run_process(["mv", str(self), str(target)]) 

69 

70 def iterdir(self) -> list["BackupHostPath"]: 

71 """ 

72 Returns: 

73 A list of Paths containing all items in the current directory. 

74 """ 

75 result = sorted(self.connection.run_process(["ls", str(self)]).strip().split("\n")) 

76 

77 if result == [""]: 

78 return [] 

79 

80 return [self / x for x in result] 

81 

82 def is_dir(self) -> bool: 

83 """Checks if a path is a directory.""" 

84 result = self.connection.run_process(["ls", "-dl", str(self)]) 

85 return result.strip()[0] == "d" 

86 

87 

88@dataclass(frozen=True) 

89class Snapshot: 

90 """Describes a b4_snapshot.""" 

91 

92 name: str 

93 subvolumes: list[BackupHostPath] 

94 base_path: BackupHostPath 

95 

96 _subvolume_delimiter: str = "!" 

97 

98 @classmethod 

99 def from_new( 

100 cls, name: str, subvolumes: list[BackupHostPath], base_path: BackupHostPath 

101 ) -> "Snapshot": 

102 """ 

103 Create instance from the backup target location. 

104 

105 Args: 

106 name: Name of the snapshot 

107 subvolumes: List of subvolumes without delimiter translation 

108 base_path: Location of this snapshot 

109 """ 

110 return Snapshot( 

111 name=name, 

112 subvolumes=[cls.escape_path(x) for x in subvolumes], 

113 base_path=base_path, 

114 ) 

115 

116 @classmethod 

117 def escape_path(cls, path: BackupHostPath) -> BackupHostPath: 

118 """ 

119 Returns: 

120 Escaped variant of subvolume path. 

121 """ 

122 return BackupHostPath( 

123 str(path).replace("/", cls._subvolume_delimiter), 

124 connection=path.connection, 

125 ) 

126 

127 @classmethod 

128 def unescape_path(cls, path: BackupHostPath) -> BackupHostPath: 

129 """ 

130 Returns: 

131 Recreates a path from an escaped variant of subvolume path. 

132 """ 

133 return BackupHostPath( 

134 str(path).replace(cls._subvolume_delimiter, "/"), 

135 connection=path.connection, 

136 ) 

137 

138 @property 

139 def subvolumes_unescaped(self) -> Generator[BackupHostPath, None, None]: 

140 """ 

141 Returns: 

142 List all subvolumes without delimiter translation as relative paths. 

143 """ 

144 return ( 

145 self.unescape_path( 

146 BackupHostPath( 

147 str(x).lstrip("!"), 

148 connection=x.connection, 

149 ) 

150 ) 

151 for x in self.subvolumes 

152 ) 

153 

154 

155@dataclass 

156class RetentionGroup: 

157 """ 

158 Contains the retention ruleset for a target. 

159 

160 Attributes: 

161 name: Name of the retention ruleset 

162 target_retention: The retention ruleset for the target itself 

163 is_source: True if this is a source retention ruleset 

164 obsolete_snapshots: All snapshots in this set will be condidered obsolete 

165 """ 

166 

167 name: str 

168 target_retention: dict[str, str] 

169 is_source: bool = True 

170 obsolete_snapshots: set[str] = field(default_factory=set) 

171 

172 @classmethod 

173 def from_target( 

174 cls, 

175 retention_name: str, 

176 target: BackupTarget, 

177 is_source: bool = True, 

178 obsolete_snapshots: set[str] | None = None, 

179 ) -> "RetentionGroup": 

180 """ 

181 Create an instance from a target and ruleset name. 

182 

183 Args: 

184 retention_name: Name of the retention ruleset to select from the target 

185 target: Target to get the ruleset from 

186 is_source: Select source ruleset or destination ruleset 

187 obsolete_snapshots: All snapshots in this set will be condidered obsolete 

188 

189 Returns: 

190 RetentionGroup instance 

191 """ 

192 target_retentions = target.src_retention if is_source else target.dst_retention 

193 target_retention = target_retentions.get(retention_name) or target_retentions[DEFAULT] 

194 

195 return RetentionGroup( 

196 name=retention_name, 

197 target_retention=target_retention, 

198 is_source=is_source, 

199 obsolete_snapshots=obsolete_snapshots or set(), 

200 ) 

201 

202 

203@dataclass(frozen=True) 

204class ChoiceSelector: 

205 """ 

206 Describes a set of data, with dynamic choices. 

207 

208 Attributes: 

209 data: Contains the actual data 

210 """ 

211 

212 data: list[str] = field(default_factory=list) 

213 

214 def resolve_target(self, targets: Iterable[str]) -> list[str]: 

215 """ 

216 Resolves a target selector and returns a list based on the selection. 

217 

218 Returns: 

219 List of resolved items 

220 """ 

221 expanded_data: set[str] = set() 

222 for item in self.data: 

223 if item in targets: 

224 expanded_data.add(item) 

225 continue 

226 

227 for target_name in targets: 

228 if PurePath(target_name).is_relative_to(item): 

229 expanded_data.add(target_name) 

230 

231 return list(expanded_data - {"_default"}) 

232 

233 def resolve_retention_name(self, snapshot_names: Iterable[str]) -> list[str]: 

234 """ 

235 Resolves a retention_name selector and returns a list based on the selection. 

236 

237 Returns: 

238 List of resolved items 

239 """ 

240 if self.data == ["ALL"]: 

241 return list({x.split("_", maxsplit=1)[1] for x in snapshot_names}) 

242 

243 return self.data