Coverage for b4_backup/main/b4_backup.py: 100%

219 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2025-06-18 22:40 +0000

1import logging 

2import re 

3from collections.abc import Iterable 

4from dataclasses import dataclass 

5from pathlib import PurePath 

6 

7import arrow 

8 

9from b4_backup import exceptions 

10from b4_backup.config_schema import ( 

11 BaseConfig, 

12 SubvolumeFallbackStrategy, 

13 TargetRestoreStrategy, 

14) 

15from b4_backup.main.backup_target_host import ( 

16 BackupTargetHost, 

17 DestinationBackupTargetHost, 

18 SourceBackupTargetHost, 

19) 

20from b4_backup.main.dataclass import ( 

21 BackupHostPath, 

22 ChoiceSelector, 

23 RetentionGroup, 

24 Snapshot, 

25) 

26 

27log = logging.getLogger("b4_backup.main") 

28 

29 

30@dataclass 

31class B4Backup: 

32 """ 

33 Main controller class for the backups. Does the backups and stuff. 

34 

35 Args: 

36 timezone: Timezone to use 

37 """ 

38 

39 timezone: str = BaseConfig.timezone 

40 

41 _size_pattern = re.compile(r"^(?:([0-9]+)(second|minute|hour|day|week|month|year)?s?)$") 

42 _timestamp_fmt = "YYYY-MM-DD-HH-mm-ss" 

43 

44 def backup( 

45 self, 

46 src_host: SourceBackupTargetHost, 

47 dst_host: DestinationBackupTargetHost | None, 

48 snapshot_name: str, 

49 ) -> None: 

50 """ 

51 Performs a backup for a single target. 

52 

53 dst_host can be none. In this case nothing will be sent and only a snapshot + clean up on source side is performed. 

54 

55 Args: 

56 src_host: An active source host instance 

57 dst_host: An active destination host instance 

58 snapshot_name: The name of the new snapshot 

59 """ 

60 log.info("Snapshot name: %s", snapshot_name) 

61 

62 src_host.create_snapshot(snapshot_name) 

63 

64 if dst_host: 

65 src_host.send_snapshot(dst_host, snapshot_name) 

66 

67 retention_name = ChoiceSelector([self._extract_retention_name(snapshot_name)]) 

68 self.clean( 

69 src_host=src_host, 

70 dst_host=dst_host, 

71 retention_names=retention_name, 

72 ) 

73 

74 def restore( 

75 self, 

76 src_host: SourceBackupTargetHost, 

77 dst_host: DestinationBackupTargetHost | None, 

78 snapshot_name: str, 

79 strategy: TargetRestoreStrategy, 

80 ) -> None: 

81 """ 

82 Restore a snapshot to one or more targets. 

83 

84 Args: 

85 src_host: An active source host instance 

86 dst_host: An active destination host instance 

87 snapshot_name: Name of the snapshot you want to restore 

88 strategy: Restore strategy or procedure to apply 

89 """ 

90 if snapshot_name == "REPLACE": 

91 if strategy != TargetRestoreStrategy.REPLACE: 

92 raise exceptions.SnapshotNotFoundError( 

93 "REPLACE can only be restored using REPLACE restore strategy" 

94 ) 

95 

96 log.info("Reverting last REPLACE restore") 

97 self._rollback_replace(src_host) 

98 

99 elif strategy == TargetRestoreStrategy.REPLACE: 

100 log.info("Using REPLACE restore strategy") 

101 self._restore_replace(src_host, dst_host, snapshot_name) 

102 

103 else: 

104 log.info("Using SAFE restore strategy") 

105 self._restore_safe(src_host, dst_host, snapshot_name) 

106 

107 def sync( 

108 self, 

109 src_host: SourceBackupTargetHost, 

110 dst_host: DestinationBackupTargetHost, 

111 ) -> None: 

112 """ 

113 Send unsended snapshots to the destination and clean them. 

114 

115 Args: 

116 src_host: An active source host instance 

117 dst_host: An active destination host instance 

118 """ 

119 self.clean(src_host, dst_host) 

120 

121 src_snapshots = src_host.snapshots() 

122 dst_snapshots = dst_host.snapshots() 

123 

124 for snapshot_name in src_snapshots.keys() - dst_snapshots.keys(): 

125 src_host.send_snapshot(dst_host, snapshot_name) 

126 

127 self.clean(src_host, dst_host) 

128 

129 def clean( 

130 self, 

131 src_host: SourceBackupTargetHost, 

132 dst_host: DestinationBackupTargetHost | None = None, 

133 retention_names: ChoiceSelector = ChoiceSelector(["ALL"]), 

134 ) -> None: 

135 """ 

136 Apply a retention ruleset on the selected targets. 

137 

138 Args: 

139 src_host: An active source host instance 

140 dst_host: An active destination host instance 

141 retention_names: Name suffix of this backup (retention ruleset) 

142 """ 

143 self._clean_target(src_host, dst_host, retention_names) 

144 self._clean_replace(src_host) 

145 self._clean_empty_dirs(src_host, dst_host) 

146 

147 def delete( 

148 self, 

149 host: BackupTargetHost, 

150 snapshot_name: str, 

151 ) -> None: 

152 """ 

153 Delete a specific snapshot from a specific target/host. 

154 

155 Args: 

156 host: the selected target host 

157 snapshot_name: The name of the snapshot to delete 

158 """ 

159 snapshots = host.snapshots() 

160 

161 if snapshot_name not in snapshots: 

162 log.warning("Snapshot %s does not exist on %s", snapshot_name, host.type) 

163 return 

164 

165 host.delete_snapshot(snapshots[snapshot_name]) 

166 

167 def delete_all( 

168 self, 

169 host: BackupTargetHost, 

170 retention_names: ChoiceSelector = ChoiceSelector(["ALL"]), 

171 ) -> None: 

172 """ 

173 Delete all snapshots from a specific target/host/retention item. 

174 

175 Args: 

176 host: the selected target host 

177 retention_names: The retention names the snapshots have to contain 

178 """ 

179 resolved_retention_names = set(retention_names.resolve_retention_name(host.snapshots())) 

180 

181 for snapshot_name, snapshot in host.snapshots().items(): 

182 if self._extract_retention_name(snapshot_name) not in resolved_retention_names: 

183 continue 

184 

185 host.delete_snapshot(snapshot) 

186 

187 def _restore_replace( 

188 self, 

189 src_host: SourceBackupTargetHost, 

190 dst_host: DestinationBackupTargetHost | None, 

191 snapshot_name: str, 

192 ) -> None: 

193 self._restore_safe(src_host, dst_host, snapshot_name) 

194 replace_path = self._remove_target(src_host) 

195 

196 snapshot = src_host.snapshots()[snapshot_name] 

197 self._restore_snapshot(src_host, snapshot, existing_replaced_target=replace_path) 

198 

199 self._clean_replace(src_host) 

200 

201 def _restore_safe( 

202 self, 

203 src_host: SourceBackupTargetHost, 

204 dst_host: DestinationBackupTargetHost | None, 

205 snapshot_name: str, 

206 ) -> None: 

207 if dst_host: 

208 dst_host.send_snapshot(src_host, snapshot_name) 

209 return 

210 

211 log.warning("Running in offline mode. Destination host snapshots are unavailable.") 

212 

213 if snapshot_name not in src_host.snapshots(): 

214 raise exceptions.SnapshotNotFoundError(snapshot_name) 

215 

216 def _rollback_replace(self, host: SourceBackupTargetHost) -> None: 

217 replaced_targets_dir = ( 

218 host.mount_point() / host.target_config.src_snapshot_dir / "replace" / host.name 

219 ) 

220 

221 replaced_targets_dir.mkdir(parents=True) 

222 replaced_targets = replaced_targets_dir.iterdir() 

223 

224 if not replaced_targets: 

225 raise exceptions.SnapshotNotFoundError("No old replace available to rollback") 

226 

227 self._remove_target(host) 

228 

229 replaced_targets[-1].rename(host.path()) # move 

230 

231 self._clean_replace(host) 

232 

233 def _remove_target(self, host: SourceBackupTargetHost) -> BackupHostPath | None: 

234 if not host.path().exists(): 

235 return None 

236 

237 replace_name = self.generate_snapshot_name() 

238 log.info("Replace name: %s", replace_name) 

239 

240 replace_dir = ( 

241 host.mount_point() 

242 / host.target_config.src_snapshot_dir 

243 / "replace" 

244 / host.name 

245 / replace_name 

246 ) 

247 

248 replace_dir.parent.mkdir(parents=True) 

249 host.path().rename(replace_dir) 

250 

251 return replace_dir 

252 

253 def generate_snapshot_name(self, name: str | None = None) -> str: 

254 """ 

255 Generate a name for a new snapshot. 

256 

257 Args: 

258 name: Retention rule name 

259 

260 Returns: 

261 Name for a snapshot 

262 """ 

263 snapshot_name = arrow.utcnow().to(self.timezone).format(self._timestamp_fmt) 

264 

265 if name: 

266 snapshot_name += f"_{name}" 

267 

268 return snapshot_name 

269 

270 def _restore_snapshot( 

271 self, 

272 host: SourceBackupTargetHost, 

273 snapshot: Snapshot, 

274 existing_replaced_target: BackupHostPath | None = None, 

275 ) -> None: 

276 con = host.connection 

277 host.path(con.location.parent).mkdir(parents=True) 

278 

279 for snapshot_subvol, subvolume_subvol_norm in zip( 

280 snapshot.subvolumes, snapshot.subvolumes_unescaped 

281 ): 

282 target_subvolume = host.path(con.location / subvolume_subvol_norm) 

283 target_subvolume.rmdir() 

284 

285 target_subvolume.parent.mkdir(parents=True) 

286 con.run_process( 

287 [ 

288 "btrfs", 

289 "subvolume", 

290 "snapshot", 

291 str(snapshot.base_path / snapshot.name / snapshot_subvol), 

292 str(target_subvolume), 

293 ] 

294 ) 

295 

296 for subvolume_str in host.target_config.subvolume_rules: 

297 subvolume_path = PurePath(subvolume_str) 

298 if not subvolume_path.is_absolute(): 

299 if ( 

300 host.target_config.subvolume_rules[subvolume_str].fallback_strategy 

301 != SubvolumeFallbackStrategy.DROP 

302 ): 

303 log.warning( 

304 'Can\'t recreate subvolumes from relative subvolume_rule path "%s". Skipped. Use an absolute Path.', 

305 subvolume_str, 

306 ) 

307 

308 continue 

309 

310 self._create_fallback_subvolume(host, subvolume_path, existing_replaced_target) 

311 

312 def _create_fallback_subvolume( 

313 self, 

314 host: SourceBackupTargetHost, 

315 subvolume_path: PurePath, 

316 existing_replaced_target: BackupHostPath | None = None, 

317 ) -> None: 

318 subvolume_str = str(subvolume_path) 

319 rules = host.target_config.subvolume_rules[subvolume_str] 

320 

321 target_subvolume_path = host.path() / PurePath(subvolume_str[1:]) 

322 

323 if target_subvolume_path.exists(): 

324 log.debug("%s already exist", target_subvolume_path) 

325 return 

326 

327 rt_subvolume: BackupHostPath | None = None 

328 if existing_replaced_target: 

329 rt_subvolume = existing_replaced_target / PurePath(subvolume_str[1:]) 

330 

331 if not rt_subvolume.exists(): 

332 rt_subvolume = None 

333 

334 target_subvolume_path.parent.mkdir(parents=True) 

335 

336 if rules.fallback_strategy == SubvolumeFallbackStrategy.KEEP and rt_subvolume: 

337 rt_subvolume.rename(target_subvolume_path) 

338 

339 elif rules.fallback_strategy == SubvolumeFallbackStrategy.NEW or ( 

340 rules.fallback_strategy == SubvolumeFallbackStrategy.KEEP and not rt_subvolume 

341 ): 

342 host.connection.run_process( 

343 ["btrfs", "subvolume", "create", str(target_subvolume_path)] 

344 ) 

345 

346 def _clean_target( 

347 self, 

348 src_host: SourceBackupTargetHost, 

349 dst_host: DestinationBackupTargetHost | None, 

350 retention_names: ChoiceSelector, 

351 ) -> None: 

352 src_retentions: list[RetentionGroup] = [] 

353 src_dst_retentions: list[RetentionGroup] = [] 

354 dst_retentions: list[RetentionGroup] = [] 

355 for retention_name in retention_names.resolve_retention_name(src_host.snapshots()): 

356 src_retentions.append( 

357 RetentionGroup.from_target( 

358 retention_name=retention_name, 

359 target=src_host.target_config, 

360 is_source=True, 

361 ) 

362 ) 

363 # We want to make sure that unsended snapshots (from an offline backup) are not deleted 

364 # They can only be deleted, if the dst_retention is flagging the snapshot as obsolete, too 

365 src_dst_retentions.append( 

366 RetentionGroup.from_target( 

367 retention_name=retention_name, 

368 target=src_host.target_config, 

369 is_source=False, 

370 ) 

371 ) 

372 

373 if dst_host: 

374 for retention_name in retention_names.resolve_retention_name(dst_host.snapshots()): 

375 dst_retentions.append( 

376 RetentionGroup.from_target( 

377 retention_name=retention_name, 

378 target=dst_host.target_config, 

379 is_source=False, 

380 ) 

381 ) 

382 self._apply_retention(dst_host, dst_retentions) 

383 

384 # Already sended snapshots however can be deleted, if they are not retained through the src_retention 

385 dst_snapshots = set(dst_host.snapshots()) 

386 for retention in src_dst_retentions: 

387 retention.obsolete_snapshots = dst_snapshots 

388 

389 self._apply_retention(src_host, src_retentions + src_dst_retentions) 

390 

391 def _apply_retention( 

392 self, 

393 host: BackupTargetHost, 

394 retentions: Iterable[RetentionGroup], 

395 ) -> None: 

396 # We only want to clean the selected retention_names 

397 snapshots = self._filter_snapshots(host.snapshots(), [x.name for x in retentions]) 

398 

399 retained_destination_snapshots: set[str] = set() 

400 retained_source_snapshots: set[str] = set() 

401 for retention in retentions: 

402 if retention.is_source: 

403 retained_source_snapshots |= self._retained_snapshots( 

404 snapshots, 

405 retention.target_retention, 

406 retention.name, 

407 retention.obsolete_snapshots, 

408 ) 

409 else: 

410 retained_destination_snapshots |= self._retained_snapshots( 

411 snapshots, 

412 retention.target_retention, 

413 retention.name, 

414 retention.obsolete_snapshots, 

415 ) 

416 

417 for snapshot_name in sorted( 

418 snapshots.keys() - (retained_source_snapshots | retained_destination_snapshots) 

419 ): 

420 host.delete_snapshot(snapshots[snapshot_name]) 

421 

422 for snapshot_name in sorted(retained_destination_snapshots - retained_source_snapshots): 

423 host.delete_snapshot( 

424 snapshots[snapshot_name], 

425 subvolumes=list(host.source_subvolumes_from_snapshot(snapshots[snapshot_name])), 

426 ) 

427 

428 def _filter_snapshots( 

429 self, snapshots: dict[str, Snapshot], retention_names: Iterable[str] 

430 ) -> dict[str, Snapshot]: 

431 return { 

432 k: v for k, v in snapshots.items() if self._extract_retention_name(k) in retention_names 

433 } 

434 

435 def _extract_retention_name(self, snapshot_name: str) -> str: 

436 return snapshot_name.split("_", maxsplit=1)[1] 

437 

438 def _clean_replace(self, host: SourceBackupTargetHost) -> None: 

439 replaced_targets_dir = ( 

440 host.mount_point() / host.target_config.src_snapshot_dir / "replace" / host.name 

441 ) 

442 

443 replaced_targets_dir.mkdir(parents=True) 

444 replaced_targets = sorted(replaced_targets_dir.iterdir(), reverse=True) 

445 

446 if not replaced_targets: 

447 return 

448 

449 for i, replaced_target in enumerate(replaced_targets): 

450 # I'm doing an off-label use of this function here 

451 # to test if the replace is obsolete 

452 if i == 0 and self._retained_snapshots( 

453 [replaced_target.name], {"all": host.target_config.replaced_target_ttl} 

454 ): 

455 continue 

456 

457 self._remove_replaced_targets(host, replaced_target) 

458 

459 def _clean_empty_dirs( 

460 self, 

461 src_host: SourceBackupTargetHost, 

462 dst_host: DestinationBackupTargetHost | None, 

463 ) -> None: 

464 src_host.remove_empty_dirs(src_host.snapshot_dir) 

465 if dst_host: 

466 dst_host.remove_empty_dirs(dst_host.path()) 

467 

468 def _remove_replaced_targets( 

469 self, host: SourceBackupTargetHost, replaced_target: PurePath 

470 ) -> None: 

471 target_subvolumes = [x for x in host.subvolumes() if x.is_relative_to(replaced_target)] 

472 

473 for subvolume in reversed(target_subvolumes): 

474 host.connection.run_process(["btrfs", "subvolume", "delete", str(subvolume)]) 

475 

476 def _transpose_snapshot_subvolumes( 

477 self, snapshots: dict[str, Snapshot] 

478 ) -> dict[BackupHostPath, set[str]]: 

479 return_dict: dict[BackupHostPath, set[str]] = {} 

480 for snapshot_name, snapshot in snapshots.items(): 

481 for subvolume in snapshot.subvolumes: 

482 if subvolume not in return_dict: 

483 return_dict[subvolume] = set() 

484 

485 return_dict[subvolume].add(snapshot_name) 

486 

487 return return_dict 

488 

489 def _retained_snapshots( 

490 self, 

491 snapshot_names: Iterable[str], 

492 retention: dict[str, str], 

493 retention_name: str | None = None, 

494 ignored_snapshots: set[str] | None = None, 

495 ) -> set[str]: 

496 ignored_snapshots = ignored_snapshots or set() 

497 

498 snapshot_dates = [ 

499 arrow.get(x.split("_")[0], self._timestamp_fmt) 

500 for x in snapshot_names 

501 if not retention_name or x.split("_", maxsplit=1)[1] == retention_name 

502 ] 

503 

504 remaining_backups: set[arrow.Arrow] = set() 

505 for interval, duration in retention.items(): 

506 remaining_backups.update(self._apply_retention_rule(interval, duration, snapshot_dates)) 

507 

508 return { 

509 item 

510 for item in { 

511 x.format(self._timestamp_fmt) + f"_{retention_name}" * (retention_name is not None) 

512 for x in remaining_backups 

513 } 

514 if item not in ignored_snapshots 

515 } 

516 

517 def _apply_retention_rule( 

518 self, interval_str: str, duration_str: str, dates: list[arrow.Arrow] 

519 ) -> list[arrow.Arrow]: 

520 interval_size, interval_magnitude = self._timebox_str_extract( 

521 interval_str, is_interval=True 

522 ) 

523 duration_size, duration_magnitude = self._timebox_str_extract( 

524 duration_str, is_interval=False 

525 ) 

526 

527 remaining: list[arrow.Arrow] = [] 

528 for date in sorted(dates, reverse=True): 

529 if duration_magnitude != "forever" and ( 

530 ( 

531 duration_magnitude is not None 

532 and date 

533 < arrow.utcnow().to(self.timezone).shift(**{duration_magnitude: -duration_size}) 

534 ) 

535 or (duration_magnitude is None and len(remaining) >= duration_size) 

536 ): 

537 break 

538 

539 if not remaining: 

540 remaining.append(date) 

541 continue 

542 

543 if interval_magnitude == "all": 

544 remaining.append(date) 

545 else: 

546 assert interval_magnitude is not None 

547 

548 min_box, max_box = ( 

549 remaining[-1] 

550 .shift(**{interval_magnitude: 1 - interval_size}) 

551 .span(interval_magnitude, count=interval_size) # type: ignore 

552 ) 

553 if date < min_box or date >= max_box: 

554 remaining.append(date) 

555 

556 return remaining 

557 

558 def _timebox_str_extract( 

559 self, timebox_str: str, is_interval: bool = False 

560 ) -> tuple[int, str | None]: 

561 if is_interval and timebox_str == "all": 

562 return 0, "all" 

563 

564 if not is_interval and timebox_str == "forever": 

565 return 0, "forever" 

566 

567 size_pattern = self._size_pattern.match(timebox_str) 

568 if not size_pattern: 

569 raise exceptions.InvalidRetentionRuleError( 

570 f"Size pattern ({timebox_str}, interval:{is_interval}) is invalid" 

571 ) 

572 

573 size = size_pattern.group(1) or 0 

574 magnitude = size_pattern.group(2) 

575 

576 if magnitude is not None: 

577 magnitude += "s" 

578 

579 return int(size), magnitude