Skip to content

Backup target host

BackupTargetHost dataclass

BackupTargetHost(
    name, target_config, snapshot_dir, connection
)

Describes a host containing backups. Can be source and destination.

Attributes:

Name Type Description
name str

The name of the TargetHost

target_config BackupTarget

The Config object describing this BackupTarget

snapshot_dir BackupHostPath

Path to the snapshots of this target on this host

connection Connection

Connection object to the host

type abstractmethod property

type

Returns:

Type Description
str

if it's a source or destination host.

delete_snapshot

delete_snapshot(snapshot, subvolumes=None)

Delete a snapshot.

Parameters:

Name Type Description Default
snapshot Snapshot

Snapshot to delete

required
subvolumes list[BackupHostPath] | None

Subvolumes to delete. If None, all subvolumes are deleted

None
Source code in b4_backup/main/backup_target_host.py
def delete_snapshot(
    self,
    snapshot: Snapshot,
    subvolumes: list[BackupHostPath] | None = None,
) -> None:
    """
    Delete a snapshot.

    Args:
        snapshot: Snapshot to delete
        subvolumes: Subvolumes to delete. If None, all subvolumes are deleted
    """
    if subvolumes is None:
        subvolumes = snapshot.subvolumes

    for subvolume in snapshot.subvolumes:
        if subvolume not in subvolumes:
            continue

        subvolume_dir = snapshot.base_path / snapshot.name / subvolume

        log.info("Delete snapshot %s on %s", str(snapshot.name / subvolume), self.type)
        self.connection.run_process(["btrfs", "subvolume", "delete", str(subvolume_dir)])

    if subvolumes == snapshot.subvolumes:
        (snapshot.base_path / snapshot.name).rmdir()

filter_subvolumes_by_backup_strategy

filter_subvolumes_by_backup_strategy(
    subvolumes, backup_strategies
)

Retrieve subvolumes that are marked as source only from a snapshot.

Parameters:

Name Type Description Default
subvolumes Iterable[BackupHostPath]

Subvolumes to filter

required
backup_strategies set[SubvolumeBackupStrategy]

Backup strategies to search for

required

Returns:

Type Description
None

Generator of subvolumes

Source code in b4_backup/main/backup_target_host.py
def filter_subvolumes_by_backup_strategy(
    self,
    subvolumes: Iterable[BackupHostPath],
    backup_strategies: set[SubvolumeBackupStrategy],
) -> Generator[BackupHostPath, None, None]:
    """
    Retrieve subvolumes that are marked as source only from a snapshot.

    Args:
        subvolumes: Subvolumes to filter
        backup_strategies: Backup strategies to search for

    Returns:
        Generator of subvolumes
    """
    return self._filter_subvolumes(
        (self.path("/") / x for x in subvolumes),
        [
            PurePath(k)
            for k, v in self.target_config.subvolume_rules.items()
            if v.backup_strategy in backup_strategies
        ],
    )

from_destination_host classmethod

from_destination_host(
    target_name, target_config, connection
)

Create an instance for a backup destination.

Parameters:

Name Type Description Default
target_name str

Name of the target

required
target_config BackupTarget

Target config

required
connection Connection

Host connection

required

Returns:

Type Description
DestinationBackupTargetHost

BackupHost instance

Source code in b4_backup/main/backup_target_host.py
@classmethod
def from_destination_host(
    cls,
    target_name: str,
    target_config: BackupTarget,
    connection: Connection,
) -> "DestinationBackupTargetHost":
    """
    Create an instance for a backup destination.

    Args:
        target_name: Name of the target
        target_config: Target config
        connection: Host connection

    Returns:
        BackupHost instance
    """
    host = DestinationBackupTargetHost(
        name=target_name,
        target_config=target_config,
        snapshot_dir=BackupHostPath(
            connection.location / "snapshots" / target_name, connection=connection
        ),
        connection=connection,
    )

    if (
        target_config.if_dst_dir_not_found == OnDestinationDirNotFound.FAIL
        and not host.path().exists()
    ):
        raise exceptions.DestinationDirectoryNotFoundError(
            "Destination directory does not exist. B4 is configured to fail. Is the device mounted properly?"
        )

    return host

from_source_host classmethod

from_source_host(target_name, target_config, connection)

Create an instance for a backup source.

Parameters:

Name Type Description Default
target_name str

Name of the target

required
target_config BackupTarget

Target config

required
connection Connection

Host connection

required

Returns:

Type Description
SourceBackupTargetHost

BackupHost instance

Source code in b4_backup/main/backup_target_host.py
@classmethod
def from_source_host(
    cls,
    target_name: str,
    target_config: BackupTarget,
    connection: Connection,
) -> "SourceBackupTargetHost":
    """
    Create an instance for a backup source.

    Args:
        target_name: Name of the target
        target_config: Target config
        connection: Host connection

    Returns:
        BackupHost instance
    """
    target_snapshot_dir = (
        BackupTargetHost._mount_point(connection)
        / target_config.src_snapshot_dir
        / "snapshots"
        / target_name
    )

    return SourceBackupTargetHost(
        name=target_name,
        target_config=target_config,
        snapshot_dir=BackupHostPath(target_snapshot_dir, connection=connection),
        connection=connection,
    )

mount_point

mount_point()

Returns:

Type Description
BackupHostPath

the mount point of the target location.

Source code in b4_backup/main/backup_target_host.py
def mount_point(self) -> BackupHostPath:
    """
    Returns:
        the mount point of the target location.
    """
    return self._mount_point(self.connection)

path

path(path=None)

Create a BackupHostPath instance.

Parameters:

Name Type Description Default
path PurePath | str | None

Pathlike object. If None, the connection location will be used

None

Returns:

Type Description
BackupHostPath

BackupHostPath instance

Source code in b4_backup/main/backup_target_host.py
def path(self, path: PurePath | str | None = None) -> BackupHostPath:
    """
    Create a BackupHostPath instance.

    Args:
        path: Pathlike object. If None, the connection location will be used

    Returns:
        BackupHostPath instance
    """
    if path is None:
        path = self.connection.location

    return BackupHostPath(path, connection=self.connection)

remove_empty_dirs

remove_empty_dirs(path, _subvolumes=None)

Recursively delete empty directories.

Returns:

Type Description
bool

True if the top dir got deleted.

Source code in b4_backup/main/backup_target_host.py
def remove_empty_dirs(
    self, path: BackupHostPath, _subvolumes: set[BackupHostPath] | None = None
) -> bool:
    """
    Recursively delete empty directories.

    Returns:
        True if the top dir got deleted.
    """
    if _subvolumes is None:
        _subvolumes = set(self.subvolumes())

    empty = True
    for subpath in path.iterdir():
        if (
            subpath in _subvolumes
            or not subpath.is_dir()
            or not self.remove_empty_dirs(subpath, _subvolumes=_subvolumes)
        ):
            empty = False

    if empty:
        log.debug("Removing empty dir: %s", path)
        path.rmdir()

    return empty

send_snapshot

send_snapshot(
    destination,
    snapshot_name,
    send_con=LocalConnection(PurePath()),
    incremental=True,
)

Send a snapshot to the destination host.

Parameters:

Name Type Description Default
destination BackupTargetHost

Destination host

required
snapshot_name str

snapshot to transmit

required
send_con LocalConnection

Optional connection from where to send from

LocalConnection(PurePath())
incremental bool

Only send the difference from the nearest snapshot already sent

True
Source code in b4_backup/main/backup_target_host.py
def send_snapshot(
    self,
    destination: "BackupTargetHost",
    snapshot_name: str,
    send_con: LocalConnection = LocalConnection(PurePath()),
    incremental: bool = True,
) -> None:
    """
    Send a snapshot to the destination host.

    Args:
        destination: Destination host
        snapshot_name: snapshot to transmit
        send_con: Optional connection from where to send from
        incremental: Only send the difference from the nearest snapshot already sent
    """
    src_snapshots = self.snapshots()
    dst_snapshots = destination.snapshots()
    self._remove_source_subvolumes(src_snapshots)

    if snapshot_name in dst_snapshots:
        log.info("Snapshot already present at %s", destination.type)
        return

    if snapshot_name not in src_snapshots:
        raise exceptions.SnapshotNotFoundError(f"The snapshot {snapshot_name} does not exist.")

    snapshot = src_snapshots[snapshot_name]

    parent_snapshot_name = None
    if incremental:
        parent_snapshot_name = self._get_nearest_matching_snapshot(
            src_group_names=set(src_snapshots),
            dst_group_names=set(dst_snapshots),
            snapshot_name=snapshot_name,
        )

    snapshot_parent_mapping = None
    if parent_snapshot_name:
        log.info("Using incremental send based on snapshot: %s", parent_snapshot_name)
        parent_snapshot = src_snapshots[parent_snapshot_name]
        snapshot_parent_mapping = self._map_parent_snapshots(snapshot, parent_snapshot)

    (destination.snapshot_dir / snapshot_name).mkdir(parents=True)

    with send_con:
        for subvol in snapshot.subvolumes:
            parent_param = ""
            if (
                parent_snapshot_name
                and snapshot_parent_mapping is not None
                and snapshot_parent_mapping[subvol] is True
            ):
                parent_param = (
                    f" -p {shlex.quote(str(self.snapshot_dir / parent_snapshot_name / subvol))}"
                )

            send_cmd = (
                f"{self.connection.exec_prefix}btrfs send{parent_param}"
                f" {shlex.quote(str(self.snapshot_dir / snapshot_name / subvol))}"
            )
            receive_cmd = (
                f"{destination.connection.exec_prefix}btrfs receive"
                f" {shlex.quote(str(destination.snapshot_dir / snapshot_name))}"
            )
            log.info(
                "Sending snapshot: %s from %s to %s",
                str(snapshot_name / subvol),
                self.type,
                destination.type,
            )
            send_con.run_process(["bash", "-c", f"{send_cmd} | {receive_cmd}"])

snapshots

snapshots()

Returns:

Type Description
dict[str, Snapshot]

All snapshots for that host/target.

Source code in b4_backup/main/backup_target_host.py
def snapshots(self) -> dict[str, Snapshot]:
    """
    Returns:
        All snapshots for that host/target.
    """
    return {
        k: Snapshot(
            name=k,
            subvolumes=v,
            base_path=self.snapshot_dir,
        )
        for k, v in self._group_subvolumes(
            self.subvolumes(),
            self.snapshot_dir,
        ).items()
    }

source_subvolumes_from_snapshot

source_subvolumes_from_snapshot(snapshot)

Retrieve subvolumes that are marked as source only and ignore from a snapshot.

Parameters:

Name Type Description Default
snapshot Snapshot

Snapshot to retrieve the subvolumes from

required

Returns:

Type Description
None

Generator of subvolumes

Source code in b4_backup/main/backup_target_host.py
def source_subvolumes_from_snapshot(
    self, snapshot: Snapshot
) -> Generator[BackupHostPath, None, None]:
    """
    Retrieve subvolumes that are marked as source only and ignore from a snapshot.

    Args:
        snapshot: Snapshot to retrieve the subvolumes from

    Returns:
        Generator of subvolumes
    """
    return (
        Snapshot.escape_path(x)
        for x in self.filter_subvolumes_by_backup_strategy(
            snapshot.subvolumes_unescaped,
            {SubvolumeBackupStrategy.SOURCE_ONLY, SubvolumeBackupStrategy.IGNORE},
        )
    )

subvolumes

subvolumes()

Returns:

Type Description
list[BackupHostPath]

A list of btrfs subvolumes.

Source code in b4_backup/main/backup_target_host.py
def subvolumes(self) -> list[BackupHostPath]:
    """
    Returns:
        A list of btrfs subvolumes.
    """
    mount_point = self.mount_point()

    result = self.connection.run_process(["btrfs", "subvolume", "list", str(mount_point)])
    result = result.replace("top level", "top_level")

    # Format looking like this per line:
    # ID 256 gen 621187 top_level 5 path my_data
    return sorted(
        [
            mount_point / value
            for line in result.split("\n")
            # Iterate two items at a time
            for key, value in zip(*[iter(line.split())] * 2)  # type: ignore
            if key == "path"
        ]
        + [mount_point]
    )

DestinationBackupTargetHost dataclass

DestinationBackupTargetHost(
    name, target_config, snapshot_dir, connection
)

Bases: BackupTargetHost

Describes a destination host containing backups. An extention of the generic BackupHost.

type property

type

Returns:

Type Description
str

if it's a source or destination host.

SourceBackupTargetHost dataclass

SourceBackupTargetHost(
    name, target_config, snapshot_dir, connection
)

Bases: BackupTargetHost

Describes a source host containing backups. An extention of the generic BackupHost.

type property

type

Returns:

Type Description
str

if it's a source or destination host.

create_snapshot

create_snapshot(snapshot_name)

Create a new snapshot for this target with the given name.

Parameters:

Name Type Description Default
snapshot_name str

Name of the snapshot.

required

Returns:

Type Description
Snapshot

Instance of the newly created snapshot.

Source code in b4_backup/main/backup_target_host.py
def create_snapshot(self, snapshot_name: str) -> Snapshot:
    """
    Create a new snapshot for this target with the given name.

    Args:
        snapshot_name: Name of the snapshot.

    Returns:
        Instance of the newly created snapshot.
    """
    log.debug("Identify target subvolumes to backup")

    src_subvolumes = self.subvolumes()
    src_target_subvolumes = [
        self.path("/") / x.relative_to(self.connection.location)
        for x in src_subvolumes
        if x.is_relative_to(self.connection.location)
    ]

    for subvolume in list(
        self.filter_subvolumes_by_backup_strategy(
            src_target_subvolumes, {SubvolumeBackupStrategy.IGNORE}
        )
    ):
        src_target_subvolumes.remove(subvolume)

    if not src_target_subvolumes:
        raise exceptions.BtrfsSubvolumeNotFoundError(
            f"The target {self.name} does not contain any btrfs subvolumes"
        )

    snapshot = Snapshot.from_new(
        name=snapshot_name,
        subvolumes=src_target_subvolumes,
        base_path=self.snapshot_dir,
    )

    original_src_target_subvolumes = [
        self.connection.location / x for x in snapshot.subvolumes_unescaped
    ]
    new_src_target_snapshots = [
        self.snapshot_dir / snapshot.name / x for x in snapshot.subvolumes
    ]

    log.debug("Create snapshots")
    (snapshot.base_path / snapshot.name).mkdir(parents=True)

    for source_path, snapshot_path in zip(
        original_src_target_subvolumes, new_src_target_snapshots
    ):
        self.connection.run_process(
            ["btrfs", "subvolume", "snapshot", "-r", str(source_path), str(snapshot_path)]
        )

    return snapshot

host_generator

host_generator(
    target_choice,
    backup_targets,
    *,
    use_source=True,
    use_destination=True,
)

Creates a generator containing connected TargetHosts for source and destination.

Parameters:

Name Type Description Default
target_choice ChoiceSelector

A ChoiceSelector list of targets to be used

required
backup_targets dict[str, BackupTarget]

A dict containing all targets available

required
use_source bool

If false, the source host will be omitted

True
use_destination bool

If false, the destination host will be omitted

True

Returns:

Type Description
None

A tuple containing source and destination TargetHosts

Source code in b4_backup/main/backup_target_host.py
def host_generator(
    target_choice: ChoiceSelector,
    backup_targets: dict[str, BackupTarget],
    *,
    use_source: bool = True,
    use_destination: bool = True,
) -> Generator[
    tuple[SourceBackupTargetHost | None, DestinationBackupTargetHost | None], None, None
]:
    """
    Creates a generator containing connected TargetHosts for source and destination.

    Args:
        target_choice: A ChoiceSelector list of targets to be used
        backup_targets: A dict containing all targets available
        use_source: If false, the source host will be omitted
        use_destination: If false, the destination host will be omitted

    Returns:
        A tuple containing source and destination TargetHosts
    """
    target_names = target_choice.resolve_target(backup_targets)
    target_connections = sorted(
        (
            (
                target_name,
                Connection.from_url(backup_targets[target_name].source if use_source else None),
                Connection.from_url(
                    backup_targets[target_name].destination if use_destination else None
                ),
            )
            for target_name in target_names
        ),
        key=_connection_sort_key,
    )
    _mark_keep_open(target_connections)

    for target_name, source, destination in target_connections:
        log.info("Backup target: %s", target_name)

        with source as src_con, destination as dst_con:
            src_host = None
            if src_con:
                src_host = BackupTargetHost.from_source_host(
                    target_name=target_name,
                    target_config=backup_targets[target_name],
                    connection=src_con,
                )

            dst_host = None
            if dst_con:
                dst_host = BackupTargetHost.from_destination_host(
                    target_name=target_name,
                    target_config=backup_targets[target_name],
                    connection=dst_con,
                )

            yield src_host, dst_host