Coverage for b4_backup/main/backup_target_host.py: 100%
199 statements
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-18 22:40 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2025-06-18 22:40 +0000
1import contextlib
2import logging
3import shlex
4from abc import ABCMeta, abstractmethod
5from collections import defaultdict
6from collections.abc import Generator, Iterable, Sequence
7from dataclasses import dataclass
8from pathlib import PurePath
10from b4_backup import exceptions
11from b4_backup.config_schema import (
12 BackupTarget,
13 OnDestinationDirNotFound,
14 SubvolumeBackupStrategy,
15)
16from b4_backup.main.connection import Connection, LocalConnection, SSHConnection
17from b4_backup.main.dataclass import BackupHostPath, ChoiceSelector, Snapshot
18from b4_backup.utils import contains_path
20log = logging.getLogger("b4_backup.main")
23@dataclass
24class BackupTargetHost(metaclass=ABCMeta):
25 """
26 Describes a host containing backups. Can be source and destination.
28 Attributes:
29 name: The name of the TargetHost
30 target_config: The Config object describing this BackupTarget
31 snapshot_dir: Path to the snapshots of this target on this host
32 connection: Connection object to the host
33 """
35 name: str
36 target_config: BackupTarget
37 snapshot_dir: BackupHostPath
38 connection: Connection
40 @classmethod
41 def from_source_host(
42 cls,
43 target_name: str,
44 target_config: BackupTarget,
45 connection: Connection,
46 ) -> "SourceBackupTargetHost":
47 """
48 Create an instance for a backup source.
50 Args:
51 target_name: Name of the target
52 target_config: Target config
53 connection: Host connection
55 Returns:
56 BackupHost instance
57 """
58 target_snapshot_dir = (
59 BackupTargetHost._mount_point(connection)
60 / target_config.src_snapshot_dir
61 / "snapshots"
62 / target_name
63 )
65 return SourceBackupTargetHost(
66 name=target_name,
67 target_config=target_config,
68 snapshot_dir=BackupHostPath(target_snapshot_dir, connection=connection),
69 connection=connection,
70 )
72 @classmethod
73 def from_destination_host(
74 cls,
75 target_name: str,
76 target_config: BackupTarget,
77 connection: Connection,
78 ) -> "DestinationBackupTargetHost":
79 """
80 Create an instance for a backup destination.
82 Args:
83 target_name: Name of the target
84 target_config: Target config
85 connection: Host connection
87 Returns:
88 BackupHost instance
89 """
90 host = DestinationBackupTargetHost(
91 name=target_name,
92 target_config=target_config,
93 snapshot_dir=BackupHostPath(
94 connection.location / "snapshots" / target_name, connection=connection
95 ),
96 connection=connection,
97 )
99 if (
100 target_config.if_dst_dir_not_found == OnDestinationDirNotFound.FAIL
101 and not host.path().exists()
102 ):
103 raise exceptions.DestinationDirectoryNotFoundError(
104 "Destination directory does not exist. B4 is configured to fail. Is the device mounted properly?"
105 )
107 return host
109 @classmethod
110 def _mount_point(cls, connection: Connection) -> BackupHostPath:
111 result = connection.run_process(["mount"])
113 possible_paths: list[PurePath] = []
114 for line in result.split("\n"):
115 if line == "":
116 continue
118 columns = line.split()
119 path = PurePath(columns[2])
121 if columns[4] != "btrfs" or not connection.location.is_relative_to(path):
122 continue
124 possible_paths.append(path)
126 if len(possible_paths) == 0:
127 raise exceptions.BtrfsPartitionNotFoundError(
128 f"{connection.location} is not located on a valid btrfs partition"
129 )
131 return BackupHostPath(sorted(possible_paths)[-1], connection=connection)
133 def mount_point(self) -> BackupHostPath:
134 """
135 Returns:
136 the mount point of the target location.
137 """
138 return self._mount_point(self.connection)
140 @property
141 @abstractmethod
142 def type(self) -> str:
143 """
144 Returns:
145 if it's a source or destination host.
146 """
148 def subvolumes(self) -> list[BackupHostPath]:
149 """
150 Returns:
151 A list of btrfs subvolumes.
152 """
153 mount_point = self.mount_point()
155 result = self.connection.run_process(["btrfs", "subvolume", "list", str(mount_point)])
156 result = result.replace("top level", "top_level")
158 # Format looking like this per line:
159 # ID 256 gen 621187 top_level 5 path my_data
160 return sorted(
161 [
162 mount_point / value
163 for line in result.split("\n")
164 # Iterate two items at a time
165 for key, value in zip(*[iter(line.split())] * 2) # type: ignore
166 if key == "path"
167 ]
168 + [mount_point]
169 )
171 def remove_empty_dirs(
172 self, path: BackupHostPath, _subvolumes: set[BackupHostPath] | None = None
173 ) -> bool:
174 """
175 Recursively delete empty directories.
177 Returns:
178 True if the top dir got deleted.
179 """
180 if _subvolumes is None:
181 _subvolumes = set(self.subvolumes())
183 empty = True
184 for subpath in path.iterdir():
185 if (
186 subpath in _subvolumes
187 or not subpath.is_dir()
188 or not self.remove_empty_dirs(subpath, _subvolumes=_subvolumes)
189 ):
190 empty = False
192 if empty:
193 log.debug("Removing empty dir: %s", path)
194 path.rmdir()
196 return empty
198 def _group_subvolumes(
199 self, subvolumes: Sequence[BackupHostPath], parent_dir: BackupHostPath
200 ) -> dict[str, list[BackupHostPath]]:
201 relevant_subvolumes = [
202 x
203 for x in subvolumes
204 if x.is_relative_to(parent_dir) and len(x.relative_to(parent_dir).parts) >= 1
205 ]
207 result_dict: dict[str, list] = {}
208 for subvol in relevant_subvolumes:
209 group = subvol.relative_to(parent_dir)
210 group_name = group.parts[0]
211 group_subdir = group.relative_to(group_name)
213 if group_name not in result_dict:
214 result_dict[group_name] = []
216 result_dict[group_name].append(group_subdir)
218 return result_dict
220 def snapshots(self) -> dict[str, Snapshot]:
221 """
222 Returns:
223 All snapshots for that host/target.
224 """
225 return {
226 k: Snapshot(
227 name=k,
228 subvolumes=v,
229 base_path=self.snapshot_dir,
230 )
231 for k, v in self._group_subvolumes(
232 self.subvolumes(),
233 self.snapshot_dir,
234 ).items()
235 }
237 def path(self, path: PurePath | str | None = None) -> BackupHostPath:
238 """
239 Create a BackupHostPath instance.
241 Args:
242 path: Pathlike object. If None, the connection location will be used
244 Returns:
245 BackupHostPath instance
246 """
247 if path is None:
248 path = self.connection.location
250 return BackupHostPath(path, connection=self.connection)
252 def delete_snapshot(
253 self,
254 snapshot: Snapshot,
255 subvolumes: list[BackupHostPath] | None = None,
256 ) -> None:
257 """
258 Delete a snapshot.
260 Args:
261 snapshot: Snapshot to delete
262 subvolumes: Subvolumes to delete. If None, all subvolumes are deleted
263 """
264 if subvolumes is None:
265 subvolumes = snapshot.subvolumes
267 for subvolume in snapshot.subvolumes:
268 if subvolume not in subvolumes:
269 continue
271 subvolume_dir = snapshot.base_path / snapshot.name / subvolume
273 log.info("Delete snapshot %s on %s", str(snapshot.name / subvolume), self.type)
274 self.connection.run_process(["btrfs", "subvolume", "delete", str(subvolume_dir)])
276 if subvolumes == snapshot.subvolumes:
277 (snapshot.base_path / snapshot.name).rmdir()
279 def _get_nearest_matching_snapshot(
280 self,
281 snapshot_name: str,
282 src_group_names: set[str],
283 dst_group_names: set[str],
284 ) -> str | None:
285 matching_groups = sorted(src_group_names & dst_group_names)
287 if not matching_groups:
288 return None
290 return (
291 [x for x in sorted(matching_groups, reverse=True) if x < snapshot_name]
292 + [x for x in sorted(matching_groups) if x > snapshot_name]
293 )[0]
295 def _map_parent_snapshots(
296 self, new_snapshot: Snapshot, parent_snapshot: Snapshot
297 ) -> dict[PurePath, bool]:
298 parent_snapshot_set = set(parent_snapshot.subvolumes)
299 return {x: x in parent_snapshot_set for x in new_snapshot.subvolumes}
301 @classmethod
302 def _filter_subvolumes(
303 cls, subvolumes: Iterable[BackupHostPath], search_paths: list[PurePath]
304 ) -> Generator[BackupHostPath, None, None]:
305 return (
306 x
307 for x in subvolumes
308 if any(contains_path(x, search_path) for search_path in search_paths)
309 )
311 def source_subvolumes_from_snapshot(
312 self, snapshot: Snapshot
313 ) -> Generator[BackupHostPath, None, None]:
314 """
315 Retrieve subvolumes that are marked as source only and ignore from a snapshot.
317 Args:
318 snapshot: Snapshot to retrieve the subvolumes from
320 Returns:
321 Generator of subvolumes
322 """
323 return (
324 Snapshot.escape_path(x)
325 for x in self.filter_subvolumes_by_backup_strategy(
326 snapshot.subvolumes_unescaped,
327 {SubvolumeBackupStrategy.SOURCE_ONLY, SubvolumeBackupStrategy.IGNORE},
328 )
329 )
331 def filter_subvolumes_by_backup_strategy(
332 self,
333 subvolumes: Iterable[BackupHostPath],
334 backup_strategies: set[SubvolumeBackupStrategy],
335 ) -> Generator[BackupHostPath, None, None]:
336 """
337 Retrieve subvolumes that are marked as source only from a snapshot.
339 Args:
340 subvolumes: Subvolumes to filter
341 backup_strategies: Backup strategies to search for
343 Returns:
344 Generator of subvolumes
345 """
346 return self._filter_subvolumes(
347 (self.path("/") / x for x in subvolumes),
348 [
349 PurePath(k)
350 for k, v in self.target_config.subvolume_rules.items()
351 if v.backup_strategy in backup_strategies
352 ],
353 )
355 def _remove_source_subvolumes(self, snapshots: dict[str, Snapshot]) -> None:
356 for snapshot in snapshots.values():
357 for subvolume in list(self.source_subvolumes_from_snapshot(snapshot)):
358 snapshot.subvolumes.remove(subvolume)
360 def send_snapshot(
361 self,
362 destination: "BackupTargetHost",
363 snapshot_name: str,
364 send_con: LocalConnection = LocalConnection(PurePath()),
365 incremental: bool = True,
366 ) -> None:
367 """
368 Send a snapshot to the destination host.
370 Args:
371 destination: Destination host
372 snapshot_name: snapshot to transmit
373 send_con: Optional connection from where to send from
374 incremental: Only send the difference from the nearest snapshot already sent
375 """
376 src_snapshots = self.snapshots()
377 dst_snapshots = destination.snapshots()
378 self._remove_source_subvolumes(src_snapshots)
380 if snapshot_name in dst_snapshots:
381 log.info("Snapshot already present at %s", destination.type)
382 return
384 if snapshot_name not in src_snapshots:
385 raise exceptions.SnapshotNotFoundError(f"The snapshot {snapshot_name} does not exist.")
387 snapshot = src_snapshots[snapshot_name]
389 parent_snapshot_name = None
390 if incremental:
391 parent_snapshot_name = self._get_nearest_matching_snapshot(
392 src_group_names=set(src_snapshots),
393 dst_group_names=set(dst_snapshots),
394 snapshot_name=snapshot_name,
395 )
397 snapshot_parent_mapping = None
398 if parent_snapshot_name:
399 log.info("Using incremental send based on snapshot: %s", parent_snapshot_name)
400 parent_snapshot = src_snapshots[parent_snapshot_name]
401 snapshot_parent_mapping = self._map_parent_snapshots(snapshot, parent_snapshot)
403 (destination.snapshot_dir / snapshot_name).mkdir(parents=True)
405 with send_con:
406 for subvol in snapshot.subvolumes:
407 parent_param = ""
408 if (
409 parent_snapshot_name
410 and snapshot_parent_mapping is not None
411 and snapshot_parent_mapping[subvol] is True
412 ):
413 parent_param = (
414 f" -p {shlex.quote(str(self.snapshot_dir / parent_snapshot_name / subvol))}"
415 )
417 send_cmd = (
418 f"{self.connection.exec_prefix}btrfs send{parent_param}"
419 f" {shlex.quote(str(self.snapshot_dir / snapshot_name / subvol))}"
420 )
421 receive_cmd = (
422 f"{destination.connection.exec_prefix}btrfs receive"
423 f" {shlex.quote(str(destination.snapshot_dir / snapshot_name))}"
424 )
425 log.info(
426 "Sending snapshot: %s from %s to %s",
427 str(snapshot_name / subvol),
428 self.type,
429 destination.type,
430 )
431 send_con.run_process(["bash", "-c", f"{send_cmd} | {receive_cmd}"])
434@dataclass
435class SourceBackupTargetHost(BackupTargetHost):
436 """Describes a source host containing backups. An extention of the generic BackupHost."""
438 @property
439 def type(self) -> str:
440 """
441 Returns:
442 if it's a source or destination host.
443 """
444 return "source"
446 def create_snapshot(self, snapshot_name: str) -> Snapshot:
447 """
448 Create a new snapshot for this target with the given name.
450 Args:
451 snapshot_name: Name of the snapshot.
453 Returns:
454 Instance of the newly created snapshot.
455 """
456 log.debug("Identify target subvolumes to backup")
458 src_subvolumes = self.subvolumes()
459 src_target_subvolumes = [
460 self.path("/") / x.relative_to(self.connection.location)
461 for x in src_subvolumes
462 if x.is_relative_to(self.connection.location)
463 ]
465 for subvolume in list(
466 self.filter_subvolumes_by_backup_strategy(
467 src_target_subvolumes, {SubvolumeBackupStrategy.IGNORE}
468 )
469 ):
470 src_target_subvolumes.remove(subvolume)
472 if not src_target_subvolumes:
473 raise exceptions.BtrfsSubvolumeNotFoundError(
474 f"The target {self.name} does not contain any btrfs subvolumes"
475 )
477 snapshot = Snapshot.from_new(
478 name=snapshot_name,
479 subvolumes=src_target_subvolumes,
480 base_path=self.snapshot_dir,
481 )
483 original_src_target_subvolumes = [
484 self.connection.location / x for x in snapshot.subvolumes_unescaped
485 ]
486 new_src_target_snapshots = [
487 self.snapshot_dir / snapshot.name / x for x in snapshot.subvolumes
488 ]
490 log.debug("Create snapshots")
491 (snapshot.base_path / snapshot.name).mkdir(parents=True)
493 for source_path, snapshot_path in zip(
494 original_src_target_subvolumes, new_src_target_snapshots
495 ):
496 self.connection.run_process(
497 ["btrfs", "subvolume", "snapshot", "-r", str(source_path), str(snapshot_path)]
498 )
500 return snapshot
503@dataclass
504class DestinationBackupTargetHost(BackupTargetHost):
505 """Describes a destination host containing backups. An extention of the generic BackupHost."""
507 @property
508 def type(self) -> str:
509 """
510 Returns:
511 if it's a source or destination host.
512 """
513 return "destination"
516def _connection_sort_key(
517 pair: tuple[str, Connection | contextlib.nullcontext, Connection | contextlib.nullcontext],
518):
519 def conn_key(conn):
520 if isinstance(conn, LocalConnection):
521 return (1,)
522 elif isinstance(conn, SSHConnection):
523 return (2, conn.host, conn.port, conn.user)
524 else:
525 return (0,)
527 return (conn_key(pair[1]), conn_key(pair[2]))
530def _mark_keep_open(
531 pairs: list[
532 tuple[str, Connection | contextlib.nullcontext, Connection | contextlib.nullcontext]
533 ],
534):
535 connection_groups: dict[tuple[str, int, str], list[SSHConnection]] = defaultdict(list)
537 for _name, a, b in pairs:
538 for conn in (a, b):
539 if isinstance(conn, SSHConnection):
540 key = (conn.host, conn.port, conn.user)
541 connection_groups[key].append(conn)
543 for conns in connection_groups.values():
544 for conn in conns[:-1]:
545 conn.keep_open = True
548def host_generator(
549 target_choice: ChoiceSelector,
550 backup_targets: dict[str, BackupTarget],
551 *,
552 use_source: bool = True,
553 use_destination: bool = True,
554) -> Generator[
555 tuple[SourceBackupTargetHost | None, DestinationBackupTargetHost | None], None, None
556]:
557 """
558 Creates a generator containing connected TargetHosts for source and destination.
560 Args:
561 target_choice: A ChoiceSelector list of targets to be used
562 backup_targets: A dict containing all targets available
563 use_source: If false, the source host will be omitted
564 use_destination: If false, the destination host will be omitted
566 Returns:
567 A tuple containing source and destination TargetHosts
568 """
569 target_names = target_choice.resolve_target(backup_targets)
570 target_connections = sorted(
571 (
572 (
573 target_name,
574 Connection.from_url(backup_targets[target_name].source if use_source else None),
575 Connection.from_url(
576 backup_targets[target_name].destination if use_destination else None
577 ),
578 )
579 for target_name in target_names
580 ),
581 key=_connection_sort_key,
582 )
583 _mark_keep_open(target_connections)
585 for target_name, source, destination in target_connections:
586 log.info("Backup target: %s", target_name)
588 with source as src_con, destination as dst_con:
589 src_host = None
590 if src_con:
591 src_host = BackupTargetHost.from_source_host(
592 target_name=target_name,
593 target_config=backup_targets[target_name],
594 connection=src_con,
595 )
597 dst_host = None
598 if dst_con:
599 dst_host = BackupTargetHost.from_destination_host(
600 target_name=target_name,
601 target_config=backup_targets[target_name],
602 connection=dst_con,
603 )
605 yield src_host, dst_host