Coverage for b4_backup/main/b4_backup.py: 100%
219 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-18 22:40 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-18 22:40 +0000
1import logging
2import re
3from collections.abc import Iterable
4from dataclasses import dataclass
5from pathlib import PurePath
7import arrow
9from b4_backup import exceptions
10from b4_backup.config_schema import (
11 BaseConfig,
12 SubvolumeFallbackStrategy,
13 TargetRestoreStrategy,
14)
15from b4_backup.main.backup_target_host import (
16 BackupTargetHost,
17 DestinationBackupTargetHost,
18 SourceBackupTargetHost,
19)
20from b4_backup.main.dataclass import (
21 BackupHostPath,
22 ChoiceSelector,
23 RetentionGroup,
24 Snapshot,
25)
27log = logging.getLogger("b4_backup.main")
30@dataclass
31class B4Backup:
32 """
33 Main controller class for the backups. Does the backups and stuff.
35 Args:
36 timezone: Timezone to use
37 """
39 timezone: str = BaseConfig.timezone
41 _size_pattern = re.compile(r"^(?:([0-9]+)(second|minute|hour|day|week|month|year)?s?)$")
42 _timestamp_fmt = "YYYY-MM-DD-HH-mm-ss"
44 def backup(
45 self,
46 src_host: SourceBackupTargetHost,
47 dst_host: DestinationBackupTargetHost | None,
48 snapshot_name: str,
49 ) -> None:
50 """
51 Performs a backup for a single target.
53 dst_host can be none. In this case nothing will be sent and only a snapshot + clean up on source side is performed.
55 Args:
56 src_host: An active source host instance
57 dst_host: An active destination host instance
58 snapshot_name: The name of the new snapshot
59 """
60 log.info("Snapshot name: %s", snapshot_name)
62 src_host.create_snapshot(snapshot_name)
64 if dst_host:
65 src_host.send_snapshot(dst_host, snapshot_name)
67 retention_name = ChoiceSelector([self._extract_retention_name(snapshot_name)])
68 self.clean(
69 src_host=src_host,
70 dst_host=dst_host,
71 retention_names=retention_name,
72 )
74 def restore(
75 self,
76 src_host: SourceBackupTargetHost,
77 dst_host: DestinationBackupTargetHost | None,
78 snapshot_name: str,
79 strategy: TargetRestoreStrategy,
80 ) -> None:
81 """
82 Restore a snapshot to one or more targets.
84 Args:
85 src_host: An active source host instance
86 dst_host: An active destination host instance
87 snapshot_name: Name of the snapshot you want to restore
88 strategy: Restore strategy or procedure to apply
89 """
90 if snapshot_name == "REPLACE":
91 if strategy != TargetRestoreStrategy.REPLACE:
92 raise exceptions.SnapshotNotFoundError(
93 "REPLACE can only be restored using REPLACE restore strategy"
94 )
96 log.info("Reverting last REPLACE restore")
97 self._rollback_replace(src_host)
99 elif strategy == TargetRestoreStrategy.REPLACE:
100 log.info("Using REPLACE restore strategy")
101 self._restore_replace(src_host, dst_host, snapshot_name)
103 else:
104 log.info("Using SAFE restore strategy")
105 self._restore_safe(src_host, dst_host, snapshot_name)
107 def sync(
108 self,
109 src_host: SourceBackupTargetHost,
110 dst_host: DestinationBackupTargetHost,
111 ) -> None:
112 """
113 Send unsended snapshots to the destination and clean them.
115 Args:
116 src_host: An active source host instance
117 dst_host: An active destination host instance
118 """
119 self.clean(src_host, dst_host)
121 src_snapshots = src_host.snapshots()
122 dst_snapshots = dst_host.snapshots()
124 for snapshot_name in src_snapshots.keys() - dst_snapshots.keys():
125 src_host.send_snapshot(dst_host, snapshot_name)
127 self.clean(src_host, dst_host)
129 def clean(
130 self,
131 src_host: SourceBackupTargetHost,
132 dst_host: DestinationBackupTargetHost | None = None,
133 retention_names: ChoiceSelector = ChoiceSelector(["ALL"]),
134 ) -> None:
135 """
136 Apply a retention ruleset on the selected targets.
138 Args:
139 src_host: An active source host instance
140 dst_host: An active destination host instance
141 retention_names: Name suffix of this backup (retention ruleset)
142 """
143 self._clean_target(src_host, dst_host, retention_names)
144 self._clean_replace(src_host)
145 self._clean_empty_dirs(src_host, dst_host)
147 def delete(
148 self,
149 host: BackupTargetHost,
150 snapshot_name: str,
151 ) -> None:
152 """
153 Delete a specific snapshot from a specific target/host.
155 Args:
156 host: the selected target host
157 snapshot_name: The name of the snapshot to delete
158 """
159 snapshots = host.snapshots()
161 if snapshot_name not in snapshots:
162 log.warning("Snapshot %s does not exist on %s", snapshot_name, host.type)
163 return
165 host.delete_snapshot(snapshots[snapshot_name])
167 def delete_all(
168 self,
169 host: BackupTargetHost,
170 retention_names: ChoiceSelector = ChoiceSelector(["ALL"]),
171 ) -> None:
172 """
173 Delete all snapshots from a specific target/host/retention item.
175 Args:
176 host: the selected target host
177 retention_names: The retention names the snapshots have to contain
178 """
179 resolved_retention_names = set(retention_names.resolve_retention_name(host.snapshots()))
181 for snapshot_name, snapshot in host.snapshots().items():
182 if self._extract_retention_name(snapshot_name) not in resolved_retention_names:
183 continue
185 host.delete_snapshot(snapshot)
187 def _restore_replace(
188 self,
189 src_host: SourceBackupTargetHost,
190 dst_host: DestinationBackupTargetHost | None,
191 snapshot_name: str,
192 ) -> None:
193 self._restore_safe(src_host, dst_host, snapshot_name)
194 replace_path = self._remove_target(src_host)
196 snapshot = src_host.snapshots()[snapshot_name]
197 self._restore_snapshot(src_host, snapshot, existing_replaced_target=replace_path)
199 self._clean_replace(src_host)
201 def _restore_safe(
202 self,
203 src_host: SourceBackupTargetHost,
204 dst_host: DestinationBackupTargetHost | None,
205 snapshot_name: str,
206 ) -> None:
207 if dst_host:
208 dst_host.send_snapshot(src_host, snapshot_name)
209 return
211 log.warning("Running in offline mode. Destination host snapshots are unavailable.")
213 if snapshot_name not in src_host.snapshots():
214 raise exceptions.SnapshotNotFoundError(snapshot_name)
216 def _rollback_replace(self, host: SourceBackupTargetHost) -> None:
217 replaced_targets_dir = (
218 host.mount_point() / host.target_config.src_snapshot_dir / "replace" / host.name
219 )
221 replaced_targets_dir.mkdir(parents=True)
222 replaced_targets = replaced_targets_dir.iterdir()
224 if not replaced_targets:
225 raise exceptions.SnapshotNotFoundError("No old replace available to rollback")
227 self._remove_target(host)
229 replaced_targets[-1].rename(host.path()) # move
231 self._clean_replace(host)
233 def _remove_target(self, host: SourceBackupTargetHost) -> BackupHostPath | None:
234 if not host.path().exists():
235 return None
237 replace_name = self.generate_snapshot_name()
238 log.info("Replace name: %s", replace_name)
240 replace_dir = (
241 host.mount_point()
242 / host.target_config.src_snapshot_dir
243 / "replace"
244 / host.name
245 / replace_name
246 )
248 replace_dir.parent.mkdir(parents=True)
249 host.path().rename(replace_dir)
251 return replace_dir
253 def generate_snapshot_name(self, name: str | None = None) -> str:
254 """
255 Generate a name for a new snapshot.
257 Args:
258 name: Retention rule name
260 Returns:
261 Name for a snapshot
262 """
263 snapshot_name = arrow.utcnow().to(self.timezone).format(self._timestamp_fmt)
265 if name:
266 snapshot_name += f"_{name}"
268 return snapshot_name
270 def _restore_snapshot(
271 self,
272 host: SourceBackupTargetHost,
273 snapshot: Snapshot,
274 existing_replaced_target: BackupHostPath | None = None,
275 ) -> None:
276 con = host.connection
277 host.path(con.location.parent).mkdir(parents=True)
279 for snapshot_subvol, subvolume_subvol_norm in zip(
280 snapshot.subvolumes, snapshot.subvolumes_unescaped
281 ):
282 target_subvolume = host.path(con.location / subvolume_subvol_norm)
283 target_subvolume.rmdir()
285 target_subvolume.parent.mkdir(parents=True)
286 con.run_process(
287 [
288 "btrfs",
289 "subvolume",
290 "snapshot",
291 str(snapshot.base_path / snapshot.name / snapshot_subvol),
292 str(target_subvolume),
293 ]
294 )
296 for subvolume_str in host.target_config.subvolume_rules:
297 subvolume_path = PurePath(subvolume_str)
298 if not subvolume_path.is_absolute():
299 if (
300 host.target_config.subvolume_rules[subvolume_str].fallback_strategy
301 != SubvolumeFallbackStrategy.DROP
302 ):
303 log.warning(
304 'Can\'t recreate subvolumes from relative subvolume_rule path "%s". Skipped. Use an absolute Path.',
305 subvolume_str,
306 )
308 continue
310 self._create_fallback_subvolume(host, subvolume_path, existing_replaced_target)
312 def _create_fallback_subvolume(
313 self,
314 host: SourceBackupTargetHost,
315 subvolume_path: PurePath,
316 existing_replaced_target: BackupHostPath | None = None,
317 ) -> None:
318 subvolume_str = str(subvolume_path)
319 rules = host.target_config.subvolume_rules[subvolume_str]
321 target_subvolume_path = host.path() / PurePath(subvolume_str[1:])
323 if target_subvolume_path.exists():
324 log.debug("%s already exist", target_subvolume_path)
325 return
327 rt_subvolume: BackupHostPath | None = None
328 if existing_replaced_target:
329 rt_subvolume = existing_replaced_target / PurePath(subvolume_str[1:])
331 if not rt_subvolume.exists():
332 rt_subvolume = None
334 target_subvolume_path.parent.mkdir(parents=True)
336 if rules.fallback_strategy == SubvolumeFallbackStrategy.KEEP and rt_subvolume:
337 rt_subvolume.rename(target_subvolume_path)
339 elif rules.fallback_strategy == SubvolumeFallbackStrategy.NEW or (
340 rules.fallback_strategy == SubvolumeFallbackStrategy.KEEP and not rt_subvolume
341 ):
342 host.connection.run_process(
343 ["btrfs", "subvolume", "create", str(target_subvolume_path)]
344 )
346 def _clean_target(
347 self,
348 src_host: SourceBackupTargetHost,
349 dst_host: DestinationBackupTargetHost | None,
350 retention_names: ChoiceSelector,
351 ) -> None:
352 src_retentions: list[RetentionGroup] = []
353 src_dst_retentions: list[RetentionGroup] = []
354 dst_retentions: list[RetentionGroup] = []
355 for retention_name in retention_names.resolve_retention_name(src_host.snapshots()):
356 src_retentions.append(
357 RetentionGroup.from_target(
358 retention_name=retention_name,
359 target=src_host.target_config,
360 is_source=True,
361 )
362 )
363 # We want to make sure that unsended snapshots (from an offline backup) are not deleted
364 # They can only be deleted, if the dst_retention is flagging the snapshot as obsolete, too
365 src_dst_retentions.append(
366 RetentionGroup.from_target(
367 retention_name=retention_name,
368 target=src_host.target_config,
369 is_source=False,
370 )
371 )
373 if dst_host:
374 for retention_name in retention_names.resolve_retention_name(dst_host.snapshots()):
375 dst_retentions.append(
376 RetentionGroup.from_target(
377 retention_name=retention_name,
378 target=dst_host.target_config,
379 is_source=False,
380 )
381 )
382 self._apply_retention(dst_host, dst_retentions)
384 # Already sended snapshots however can be deleted, if they are not retained through the src_retention
385 dst_snapshots = set(dst_host.snapshots())
386 for retention in src_dst_retentions:
387 retention.obsolete_snapshots = dst_snapshots
389 self._apply_retention(src_host, src_retentions + src_dst_retentions)
391 def _apply_retention(
392 self,
393 host: BackupTargetHost,
394 retentions: Iterable[RetentionGroup],
395 ) -> None:
396 # We only want to clean the selected retention_names
397 snapshots = self._filter_snapshots(host.snapshots(), [x.name for x in retentions])
399 retained_destination_snapshots: set[str] = set()
400 retained_source_snapshots: set[str] = set()
401 for retention in retentions:
402 if retention.is_source:
403 retained_source_snapshots |= self._retained_snapshots(
404 snapshots,
405 retention.target_retention,
406 retention.name,
407 retention.obsolete_snapshots,
408 )
409 else:
410 retained_destination_snapshots |= self._retained_snapshots(
411 snapshots,
412 retention.target_retention,
413 retention.name,
414 retention.obsolete_snapshots,
415 )
417 for snapshot_name in sorted(
418 snapshots.keys() - (retained_source_snapshots | retained_destination_snapshots)
419 ):
420 host.delete_snapshot(snapshots[snapshot_name])
422 for snapshot_name in sorted(retained_destination_snapshots - retained_source_snapshots):
423 host.delete_snapshot(
424 snapshots[snapshot_name],
425 subvolumes=list(host.source_subvolumes_from_snapshot(snapshots[snapshot_name])),
426 )
428 def _filter_snapshots(
429 self, snapshots: dict[str, Snapshot], retention_names: Iterable[str]
430 ) -> dict[str, Snapshot]:
431 return {
432 k: v for k, v in snapshots.items() if self._extract_retention_name(k) in retention_names
433 }
435 def _extract_retention_name(self, snapshot_name: str) -> str:
436 return snapshot_name.split("_", maxsplit=1)[1]
438 def _clean_replace(self, host: SourceBackupTargetHost) -> None:
439 replaced_targets_dir = (
440 host.mount_point() / host.target_config.src_snapshot_dir / "replace" / host.name
441 )
443 replaced_targets_dir.mkdir(parents=True)
444 replaced_targets = sorted(replaced_targets_dir.iterdir(), reverse=True)
446 if not replaced_targets:
447 return
449 for i, replaced_target in enumerate(replaced_targets):
450 # I'm doing an off-label use of this function here
451 # to test if the replace is obsolete
452 if i == 0 and self._retained_snapshots(
453 [replaced_target.name], {"all": host.target_config.replaced_target_ttl}
454 ):
455 continue
457 self._remove_replaced_targets(host, replaced_target)
459 def _clean_empty_dirs(
460 self,
461 src_host: SourceBackupTargetHost,
462 dst_host: DestinationBackupTargetHost | None,
463 ) -> None:
464 src_host.remove_empty_dirs(src_host.snapshot_dir)
465 if dst_host:
466 dst_host.remove_empty_dirs(dst_host.path())
468 def _remove_replaced_targets(
469 self, host: SourceBackupTargetHost, replaced_target: PurePath
470 ) -> None:
471 target_subvolumes = [x for x in host.subvolumes() if x.is_relative_to(replaced_target)]
473 for subvolume in reversed(target_subvolumes):
474 host.connection.run_process(["btrfs", "subvolume", "delete", str(subvolume)])
476 def _transpose_snapshot_subvolumes(
477 self, snapshots: dict[str, Snapshot]
478 ) -> dict[BackupHostPath, set[str]]:
479 return_dict: dict[BackupHostPath, set[str]] = {}
480 for snapshot_name, snapshot in snapshots.items():
481 for subvolume in snapshot.subvolumes:
482 if subvolume not in return_dict:
483 return_dict[subvolume] = set()
485 return_dict[subvolume].add(snapshot_name)
487 return return_dict
489 def _retained_snapshots(
490 self,
491 snapshot_names: Iterable[str],
492 retention: dict[str, str],
493 retention_name: str | None = None,
494 ignored_snapshots: set[str] | None = None,
495 ) -> set[str]:
496 ignored_snapshots = ignored_snapshots or set()
498 snapshot_dates = [
499 arrow.get(x.split("_")[0], self._timestamp_fmt)
500 for x in snapshot_names
501 if not retention_name or x.split("_", maxsplit=1)[1] == retention_name
502 ]
504 remaining_backups: set[arrow.Arrow] = set()
505 for interval, duration in retention.items():
506 remaining_backups.update(self._apply_retention_rule(interval, duration, snapshot_dates))
508 return {
509 item
510 for item in {
511 x.format(self._timestamp_fmt) + f"_{retention_name}" * (retention_name is not None)
512 for x in remaining_backups
513 }
514 if item not in ignored_snapshots
515 }
517 def _apply_retention_rule(
518 self, interval_str: str, duration_str: str, dates: list[arrow.Arrow]
519 ) -> list[arrow.Arrow]:
520 interval_size, interval_magnitude = self._timebox_str_extract(
521 interval_str, is_interval=True
522 )
523 duration_size, duration_magnitude = self._timebox_str_extract(
524 duration_str, is_interval=False
525 )
527 remaining: list[arrow.Arrow] = []
528 for date in sorted(dates, reverse=True):
529 if duration_magnitude != "forever" and (
530 (
531 duration_magnitude is not None
532 and date
533 < arrow.utcnow().to(self.timezone).shift(**{duration_magnitude: -duration_size})
534 )
535 or (duration_magnitude is None and len(remaining) >= duration_size)
536 ):
537 break
539 if not remaining:
540 remaining.append(date)
541 continue
543 if interval_magnitude == "all":
544 remaining.append(date)
545 else:
546 assert interval_magnitude is not None
548 min_box, max_box = (
549 remaining[-1]
550 .shift(**{interval_magnitude: 1 - interval_size})
551 .span(interval_magnitude, count=interval_size) # type: ignore
552 )
553 if date < min_box or date >= max_box:
554 remaining.append(date)
556 return remaining
558 def _timebox_str_extract(
559 self, timebox_str: str, is_interval: bool = False
560 ) -> tuple[int, str | None]:
561 if is_interval and timebox_str == "all":
562 return 0, "all"
564 if not is_interval and timebox_str == "forever":
565 return 0, "forever"
567 size_pattern = self._size_pattern.match(timebox_str)
568 if not size_pattern:
569 raise exceptions.InvalidRetentionRuleError(
570 f"Size pattern ({timebox_str}, interval:{is_interval}) is invalid"
571 )
573 size = size_pattern.group(1) or 0
574 magnitude = size_pattern.group(2)
576 if magnitude is not None:
577 magnitude += "s"
579 return int(size), magnitude