Coverage for b4_backup/main/backup_target_host.py: 100%

199 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-18 22:40 +0000

1import contextlib 

2import logging 

3import shlex 

4from abc import ABCMeta, abstractmethod 

5from collections import defaultdict 

6from collections.abc import Generator, Iterable, Sequence 

7from dataclasses import dataclass 

8from pathlib import PurePath 

9 

10from b4_backup import exceptions 

11from b4_backup.config_schema import ( 

12 BackupTarget, 

13 OnDestinationDirNotFound, 

14 SubvolumeBackupStrategy, 

15) 

16from b4_backup.main.connection import Connection, LocalConnection, SSHConnection 

17from b4_backup.main.dataclass import BackupHostPath, ChoiceSelector, Snapshot 

18from b4_backup.utils import contains_path 

19 

20log = logging.getLogger("b4_backup.main") 

21 

22 

23@dataclass 

24class BackupTargetHost(metaclass=ABCMeta): 

25 """ 

26 Describes a host containing backups. Can be source and destination. 

27 

28 Attributes: 

29 name: The name of the TargetHost 

30 target_config: The Config object describing this BackupTarget 

31 snapshot_dir: Path to the snapshots of this target on this host 

32 connection: Connection object to the host 

33 """ 

34 

35 name: str 

36 target_config: BackupTarget 

37 snapshot_dir: BackupHostPath 

38 connection: Connection 

39 

40 @classmethod 

41 def from_source_host( 

42 cls, 

43 target_name: str, 

44 target_config: BackupTarget, 

45 connection: Connection, 

46 ) -> "SourceBackupTargetHost": 

47 """ 

48 Create an instance for a backup source. 

49 

50 Args: 

51 target_name: Name of the target 

52 target_config: Target config 

53 connection: Host connection 

54 

55 Returns: 

56 BackupHost instance 

57 """ 

58 target_snapshot_dir = ( 

59 BackupTargetHost._mount_point(connection) 

60 / target_config.src_snapshot_dir 

61 / "snapshots" 

62 / target_name 

63 ) 

64 

65 return SourceBackupTargetHost( 

66 name=target_name, 

67 target_config=target_config, 

68 snapshot_dir=BackupHostPath(target_snapshot_dir, connection=connection), 

69 connection=connection, 

70 ) 

71 

72 @classmethod 

73 def from_destination_host( 

74 cls, 

75 target_name: str, 

76 target_config: BackupTarget, 

77 connection: Connection, 

78 ) -> "DestinationBackupTargetHost": 

79 """ 

80 Create an instance for a backup destination. 

81 

82 Args: 

83 target_name: Name of the target 

84 target_config: Target config 

85 connection: Host connection 

86 

87 Returns: 

88 BackupHost instance 

89 """ 

90 host = DestinationBackupTargetHost( 

91 name=target_name, 

92 target_config=target_config, 

93 snapshot_dir=BackupHostPath( 

94 connection.location / "snapshots" / target_name, connection=connection 

95 ), 

96 connection=connection, 

97 ) 

98 

99 if ( 

100 target_config.if_dst_dir_not_found == OnDestinationDirNotFound.FAIL 

101 and not host.path().exists() 

102 ): 

103 raise exceptions.DestinationDirectoryNotFoundError( 

104 "Destination directory does not exist. B4 is configured to fail. Is the device mounted properly?" 

105 ) 

106 

107 return host 

108 

109 @classmethod 

110 def _mount_point(cls, connection: Connection) -> BackupHostPath: 

111 result = connection.run_process(["mount"]) 

112 

113 possible_paths: list[PurePath] = [] 

114 for line in result.split("\n"): 

115 if line == "": 

116 continue 

117 

118 columns = line.split() 

119 path = PurePath(columns[2]) 

120 

121 if columns[4] != "btrfs" or not connection.location.is_relative_to(path): 

122 continue 

123 

124 possible_paths.append(path) 

125 

126 if len(possible_paths) == 0: 

127 raise exceptions.BtrfsPartitionNotFoundError( 

128 f"{connection.location} is not located on a valid btrfs partition" 

129 ) 

130 

131 return BackupHostPath(sorted(possible_paths)[-1], connection=connection) 

132 

133 def mount_point(self) -> BackupHostPath: 

134 """ 

135 Returns: 

136 the mount point of the target location. 

137 """ 

138 return self._mount_point(self.connection) 

139 

140 @property 

141 @abstractmethod 

142 def type(self) -> str: 

143 """ 

144 Returns: 

145 if it's a source or destination host. 

146 """ 

147 

148 def subvolumes(self) -> list[BackupHostPath]: 

149 """ 

150 Returns: 

151 A list of btrfs subvolumes. 

152 """ 

153 mount_point = self.mount_point() 

154 

155 result = self.connection.run_process(["btrfs", "subvolume", "list", str(mount_point)]) 

156 result = result.replace("top level", "top_level") 

157 

158 # Format looking like this per line: 

159 # ID 256 gen 621187 top_level 5 path my_data 

160 return sorted( 

161 [ 

162 mount_point / value 

163 for line in result.split("\n") 

164 # Iterate two items at a time 

165 for key, value in zip(*[iter(line.split())] * 2) # type: ignore 

166 if key == "path" 

167 ] 

168 + [mount_point] 

169 ) 

170 

171 def remove_empty_dirs( 

172 self, path: BackupHostPath, _subvolumes: set[BackupHostPath] | None = None 

173 ) -> bool: 

174 """ 

175 Recursively delete empty directories. 

176 

177 Returns: 

178 True if the top dir got deleted. 

179 """ 

180 if _subvolumes is None: 

181 _subvolumes = set(self.subvolumes()) 

182 

183 empty = True 

184 for subpath in path.iterdir(): 

185 if ( 

186 subpath in _subvolumes 

187 or not subpath.is_dir() 

188 or not self.remove_empty_dirs(subpath, _subvolumes=_subvolumes) 

189 ): 

190 empty = False 

191 

192 if empty: 

193 log.debug("Removing empty dir: %s", path) 

194 path.rmdir() 

195 

196 return empty 

197 

198 def _group_subvolumes( 

199 self, subvolumes: Sequence[BackupHostPath], parent_dir: BackupHostPath 

200 ) -> dict[str, list[BackupHostPath]]: 

201 relevant_subvolumes = [ 

202 x 

203 for x in subvolumes 

204 if x.is_relative_to(parent_dir) and len(x.relative_to(parent_dir).parts) >= 1 

205 ] 

206 

207 result_dict: dict[str, list] = {} 

208 for subvol in relevant_subvolumes: 

209 group = subvol.relative_to(parent_dir) 

210 group_name = group.parts[0] 

211 group_subdir = group.relative_to(group_name) 

212 

213 if group_name not in result_dict: 

214 result_dict[group_name] = [] 

215 

216 result_dict[group_name].append(group_subdir) 

217 

218 return result_dict 

219 

220 def snapshots(self) -> dict[str, Snapshot]: 

221 """ 

222 Returns: 

223 All snapshots for that host/target. 

224 """ 

225 return { 

226 k: Snapshot( 

227 name=k, 

228 subvolumes=v, 

229 base_path=self.snapshot_dir, 

230 ) 

231 for k, v in self._group_subvolumes( 

232 self.subvolumes(), 

233 self.snapshot_dir, 

234 ).items() 

235 } 

236 

237 def path(self, path: PurePath | str | None = None) -> BackupHostPath: 

238 """ 

239 Create a BackupHostPath instance. 

240 

241 Args: 

242 path: Pathlike object. If None, the connection location will be used 

243 

244 Returns: 

245 BackupHostPath instance 

246 """ 

247 if path is None: 

248 path = self.connection.location 

249 

250 return BackupHostPath(path, connection=self.connection) 

251 

252 def delete_snapshot( 

253 self, 

254 snapshot: Snapshot, 

255 subvolumes: list[BackupHostPath] | None = None, 

256 ) -> None: 

257 """ 

258 Delete a snapshot. 

259 

260 Args: 

261 snapshot: Snapshot to delete 

262 subvolumes: Subvolumes to delete. If None, all subvolumes are deleted 

263 """ 

264 if subvolumes is None: 

265 subvolumes = snapshot.subvolumes 

266 

267 for subvolume in snapshot.subvolumes: 

268 if subvolume not in subvolumes: 

269 continue 

270 

271 subvolume_dir = snapshot.base_path / snapshot.name / subvolume 

272 

273 log.info("Delete snapshot %s on %s", str(snapshot.name / subvolume), self.type) 

274 self.connection.run_process(["btrfs", "subvolume", "delete", str(subvolume_dir)]) 

275 

276 if subvolumes == snapshot.subvolumes: 

277 (snapshot.base_path / snapshot.name).rmdir() 

278 

279 def _get_nearest_matching_snapshot( 

280 self, 

281 snapshot_name: str, 

282 src_group_names: set[str], 

283 dst_group_names: set[str], 

284 ) -> str | None: 

285 matching_groups = sorted(src_group_names & dst_group_names) 

286 

287 if not matching_groups: 

288 return None 

289 

290 return ( 

291 [x for x in sorted(matching_groups, reverse=True) if x < snapshot_name] 

292 + [x for x in sorted(matching_groups) if x > snapshot_name] 

293 )[0] 

294 

295 def _map_parent_snapshots( 

296 self, new_snapshot: Snapshot, parent_snapshot: Snapshot 

297 ) -> dict[PurePath, bool]: 

298 parent_snapshot_set = set(parent_snapshot.subvolumes) 

299 return {x: x in parent_snapshot_set for x in new_snapshot.subvolumes} 

300 

301 @classmethod 

302 def _filter_subvolumes( 

303 cls, subvolumes: Iterable[BackupHostPath], search_paths: list[PurePath] 

304 ) -> Generator[BackupHostPath, None, None]: 

305 return ( 

306 x 

307 for x in subvolumes 

308 if any(contains_path(x, search_path) for search_path in search_paths) 

309 ) 

310 

311 def source_subvolumes_from_snapshot( 

312 self, snapshot: Snapshot 

313 ) -> Generator[BackupHostPath, None, None]: 

314 """ 

315 Retrieve subvolumes that are marked as source only and ignore from a snapshot. 

316 

317 Args: 

318 snapshot: Snapshot to retrieve the subvolumes from 

319 

320 Returns: 

321 Generator of subvolumes 

322 """ 

323 return ( 

324 Snapshot.escape_path(x) 

325 for x in self.filter_subvolumes_by_backup_strategy( 

326 snapshot.subvolumes_unescaped, 

327 {SubvolumeBackupStrategy.SOURCE_ONLY, SubvolumeBackupStrategy.IGNORE}, 

328 ) 

329 ) 

330 

331 def filter_subvolumes_by_backup_strategy( 

332 self, 

333 subvolumes: Iterable[BackupHostPath], 

334 backup_strategies: set[SubvolumeBackupStrategy], 

335 ) -> Generator[BackupHostPath, None, None]: 

336 """ 

337 Retrieve subvolumes that are marked as source only from a snapshot. 

338 

339 Args: 

340 subvolumes: Subvolumes to filter 

341 backup_strategies: Backup strategies to search for 

342 

343 Returns: 

344 Generator of subvolumes 

345 """ 

346 return self._filter_subvolumes( 

347 (self.path("/") / x for x in subvolumes), 

348 [ 

349 PurePath(k) 

350 for k, v in self.target_config.subvolume_rules.items() 

351 if v.backup_strategy in backup_strategies 

352 ], 

353 ) 

354 

355 def _remove_source_subvolumes(self, snapshots: dict[str, Snapshot]) -> None: 

356 for snapshot in snapshots.values(): 

357 for subvolume in list(self.source_subvolumes_from_snapshot(snapshot)): 

358 snapshot.subvolumes.remove(subvolume) 

359 

360 def send_snapshot( 

361 self, 

362 destination: "BackupTargetHost", 

363 snapshot_name: str, 

364 send_con: LocalConnection = LocalConnection(PurePath()), 

365 incremental: bool = True, 

366 ) -> None: 

367 """ 

368 Send a snapshot to the destination host. 

369 

370 Args: 

371 destination: Destination host 

372 snapshot_name: snapshot to transmit 

373 send_con: Optional connection from where to send from 

374 incremental: Only send the difference from the nearest snapshot already sent 

375 """ 

376 src_snapshots = self.snapshots() 

377 dst_snapshots = destination.snapshots() 

378 self._remove_source_subvolumes(src_snapshots) 

379 

380 if snapshot_name in dst_snapshots: 

381 log.info("Snapshot already present at %s", destination.type) 

382 return 

383 

384 if snapshot_name not in src_snapshots: 

385 raise exceptions.SnapshotNotFoundError(f"The snapshot {snapshot_name} does not exist.") 

386 

387 snapshot = src_snapshots[snapshot_name] 

388 

389 parent_snapshot_name = None 

390 if incremental: 

391 parent_snapshot_name = self._get_nearest_matching_snapshot( 

392 src_group_names=set(src_snapshots), 

393 dst_group_names=set(dst_snapshots), 

394 snapshot_name=snapshot_name, 

395 ) 

396 

397 snapshot_parent_mapping = None 

398 if parent_snapshot_name: 

399 log.info("Using incremental send based on snapshot: %s", parent_snapshot_name) 

400 parent_snapshot = src_snapshots[parent_snapshot_name] 

401 snapshot_parent_mapping = self._map_parent_snapshots(snapshot, parent_snapshot) 

402 

403 (destination.snapshot_dir / snapshot_name).mkdir(parents=True) 

404 

405 with send_con: 

406 for subvol in snapshot.subvolumes: 

407 parent_param = "" 

408 if ( 

409 parent_snapshot_name 

410 and snapshot_parent_mapping is not None 

411 and snapshot_parent_mapping[subvol] is True 

412 ): 

413 parent_param = ( 

414 f" -p {shlex.quote(str(self.snapshot_dir / parent_snapshot_name / subvol))}" 

415 ) 

416 

417 send_cmd = ( 

418 f"{self.connection.exec_prefix}btrfs send{parent_param}" 

419 f" {shlex.quote(str(self.snapshot_dir / snapshot_name / subvol))}" 

420 ) 

421 receive_cmd = ( 

422 f"{destination.connection.exec_prefix}btrfs receive" 

423 f" {shlex.quote(str(destination.snapshot_dir / snapshot_name))}" 

424 ) 

425 log.info( 

426 "Sending snapshot: %s from %s to %s", 

427 str(snapshot_name / subvol), 

428 self.type, 

429 destination.type, 

430 ) 

431 send_con.run_process(["bash", "-c", f"{send_cmd} | {receive_cmd}"]) 

432 

433 

434@dataclass 

435class SourceBackupTargetHost(BackupTargetHost): 

436 """Describes a source host containing backups. An extention of the generic BackupHost.""" 

437 

438 @property 

439 def type(self) -> str: 

440 """ 

441 Returns: 

442 if it's a source or destination host. 

443 """ 

444 return "source" 

445 

446 def create_snapshot(self, snapshot_name: str) -> Snapshot: 

447 """ 

448 Create a new snapshot for this target with the given name. 

449 

450 Args: 

451 snapshot_name: Name of the snapshot. 

452 

453 Returns: 

454 Instance of the newly created snapshot. 

455 """ 

456 log.debug("Identify target subvolumes to backup") 

457 

458 src_subvolumes = self.subvolumes() 

459 src_target_subvolumes = [ 

460 self.path("/") / x.relative_to(self.connection.location) 

461 for x in src_subvolumes 

462 if x.is_relative_to(self.connection.location) 

463 ] 

464 

465 for subvolume in list( 

466 self.filter_subvolumes_by_backup_strategy( 

467 src_target_subvolumes, {SubvolumeBackupStrategy.IGNORE} 

468 ) 

469 ): 

470 src_target_subvolumes.remove(subvolume) 

471 

472 if not src_target_subvolumes: 

473 raise exceptions.BtrfsSubvolumeNotFoundError( 

474 f"The target {self.name} does not contain any btrfs subvolumes" 

475 ) 

476 

477 snapshot = Snapshot.from_new( 

478 name=snapshot_name, 

479 subvolumes=src_target_subvolumes, 

480 base_path=self.snapshot_dir, 

481 ) 

482 

483 original_src_target_subvolumes = [ 

484 self.connection.location / x for x in snapshot.subvolumes_unescaped 

485 ] 

486 new_src_target_snapshots = [ 

487 self.snapshot_dir / snapshot.name / x for x in snapshot.subvolumes 

488 ] 

489 

490 log.debug("Create snapshots") 

491 (snapshot.base_path / snapshot.name).mkdir(parents=True) 

492 

493 for source_path, snapshot_path in zip( 

494 original_src_target_subvolumes, new_src_target_snapshots 

495 ): 

496 self.connection.run_process( 

497 ["btrfs", "subvolume", "snapshot", "-r", str(source_path), str(snapshot_path)] 

498 ) 

499 

500 return snapshot 

501 

502 

503@dataclass 

504class DestinationBackupTargetHost(BackupTargetHost): 

505 """Describes a destination host containing backups. An extention of the generic BackupHost.""" 

506 

507 @property 

508 def type(self) -> str: 

509 """ 

510 Returns: 

511 if it's a source or destination host. 

512 """ 

513 return "destination" 

514 

515 

516def _connection_sort_key( 

517 pair: tuple[str, Connection | contextlib.nullcontext, Connection | contextlib.nullcontext], 

518): 

519 def conn_key(conn): 

520 if isinstance(conn, LocalConnection): 

521 return (1,) 

522 elif isinstance(conn, SSHConnection): 

523 return (2, conn.host, conn.port, conn.user) 

524 else: 

525 return (0,) 

526 

527 return (conn_key(pair[1]), conn_key(pair[2])) 

528 

529 

530def _mark_keep_open( 

531 pairs: list[ 

532 tuple[str, Connection | contextlib.nullcontext, Connection | contextlib.nullcontext] 

533 ], 

534): 

535 connection_groups: dict[tuple[str, int, str], list[SSHConnection]] = defaultdict(list) 

536 

537 for _name, a, b in pairs: 

538 for conn in (a, b): 

539 if isinstance(conn, SSHConnection): 

540 key = (conn.host, conn.port, conn.user) 

541 connection_groups[key].append(conn) 

542 

543 for conns in connection_groups.values(): 

544 for conn in conns[:-1]: 

545 conn.keep_open = True 

546 

547 

548def host_generator( 

549 target_choice: ChoiceSelector, 

550 backup_targets: dict[str, BackupTarget], 

551 *, 

552 use_source: bool = True, 

553 use_destination: bool = True, 

554) -> Generator[ 

555 tuple[SourceBackupTargetHost | None, DestinationBackupTargetHost | None], None, None 

556]: 

557 """ 

558 Creates a generator containing connected TargetHosts for source and destination. 

559 

560 Args: 

561 target_choice: A ChoiceSelector list of targets to be used 

562 backup_targets: A dict containing all targets available 

563 use_source: If false, the source host will be omitted 

564 use_destination: If false, the destination host will be omitted 

565 

566 Returns: 

567 A tuple containing source and destination TargetHosts 

568 """ 

569 target_names = target_choice.resolve_target(backup_targets) 

570 target_connections = sorted( 

571 ( 

572 ( 

573 target_name, 

574 Connection.from_url(backup_targets[target_name].source if use_source else None), 

575 Connection.from_url( 

576 backup_targets[target_name].destination if use_destination else None 

577 ), 

578 ) 

579 for target_name in target_names 

580 ), 

581 key=_connection_sort_key, 

582 ) 

583 _mark_keep_open(target_connections) 

584 

585 for target_name, source, destination in target_connections: 

586 log.info("Backup target: %s", target_name) 

587 

588 with source as src_con, destination as dst_con: 

589 src_host = None 

590 if src_con: 

591 src_host = BackupTargetHost.from_source_host( 

592 target_name=target_name, 

593 target_config=backup_targets[target_name], 

594 connection=src_con, 

595 ) 

596 

597 dst_host = None 

598 if dst_con: 

599 dst_host = BackupTargetHost.from_destination_host( 

600 target_name=target_name, 

601 target_config=backup_targets[target_name], 

602 connection=dst_con, 

603 ) 

604 

605 yield src_host, dst_host