Files
drobboxordinacebackup/migrate_to_zip.py
2026-02-12 07:38:16 +01:00

135 lines
4.6 KiB
Python

"""
One-time migration: convert plain .bak backup blobs to AES-256 encrypted .zip files.
Usage: python migrate_to_zip.py
Walks BACKUP_PATH, finds all .bak files, creates encrypted .zip for each,
then deletes the original .bak. Resumable: skips files where .zip already exists.
"""
import os
import sys
import time
import pyzipper
from indexer.config import BACKUP_PATH, BACKUP_PASSWORD
def collect_bak_files(backup_root: str) -> list:
"""Walk backup dir and collect all .bak file paths."""
bak_files = []
for dirpath, _dirnames, filenames in os.walk(backup_root):
for fn in filenames:
if fn.endswith(".bak"):
bak_files.append(os.path.join(dirpath, fn))
return bak_files
def migrate(backup_root: str, password: str):
print(f"Backup dir: {backup_root}")
print("Scanning for .bak files...")
bak_files = collect_bak_files(backup_root)
total = len(bak_files)
print(f"Found {total} .bak files to migrate.\n")
if total == 0:
print("Nothing to migrate.")
return
password_bytes = password.encode("utf-8")
converted = 0
skipped = 0
errors = 0
start_time = time.time()
try:
for i, bak_path in enumerate(bak_files, 1):
# Derive the .zip path from the .bak path
# e.g., ab/cd/abcdef...64hex.bak -> ab/cd/abcdef...64hex.zip
base = bak_path[:-4] # strip ".bak"
zip_path = base + ".zip"
hex_hash = os.path.basename(base) # the 64-char hex name
# Resume support: skip if .zip already exists
if os.path.exists(zip_path):
skipped += 1
if i % 500 == 0 or i == total:
elapsed = time.time() - start_time
print(f" [{i}/{total}] ({100*i//total}%) "
f"converted={converted} skipped={skipped} errors={errors} "
f"elapsed={elapsed:.0f}s")
continue
try:
# Create encrypted zip in a temp file, then rename
tmp_path = zip_path + ".tmp"
with pyzipper.AESZipFile(
tmp_path, "w",
compression=pyzipper.ZIP_DEFLATED,
encryption=pyzipper.WZ_AES,
) as zf:
zf.setpassword(password_bytes)
zf.write(bak_path, arcname=hex_hash + ".blob")
os.replace(tmp_path, zip_path)
# Verify the zip is valid before deleting original
with pyzipper.AESZipFile(zip_path, "r") as zf:
zf.setpassword(password_bytes)
names = zf.namelist()
if not names:
raise ValueError("ZIP is empty after creation")
# Delete original .bak
os.remove(bak_path)
converted += 1
except Exception as e:
print(f" ERROR: {bak_path}: {e}")
errors += 1
# Clean up temp file if it exists
if os.path.exists(zip_path + ".tmp"):
try:
os.remove(zip_path + ".tmp")
except OSError:
pass
continue
# Progress every 500 files
if i % 500 == 0 or i == total:
elapsed = time.time() - start_time
rate = converted / elapsed if elapsed > 0 else 0
eta = (total - i) / rate if rate > 0 else 0
print(f" [{i}/{total}] ({100*i//total}%) "
f"converted={converted} skipped={skipped} errors={errors} "
f"elapsed={elapsed:.0f}s ETA={eta:.0f}s")
except KeyboardInterrupt:
print(f"\n\nInterrupted by user at file {i}/{total}.")
print("Migration is resumable — run again to continue.")
elapsed = time.time() - start_time
print(f"\n{'='*60}")
print(f"Migration complete.")
print(f" Total .bak files : {total}")
print(f" Converted : {converted}")
print(f" Skipped (exists) : {skipped}")
print(f" Errors : {errors}")
print(f" Time : {elapsed:.0f}s")
print(f"{'='*60}")
if __name__ == "__main__":
if not BACKUP_PATH or not os.path.isdir(BACKUP_PATH):
print(f"ERROR: BACKUP_PATH is not a valid directory: {BACKUP_PATH}")
sys.exit(1)
if not BACKUP_PASSWORD:
print("ERROR: BACKUP_PASSWORD not set in .env")
sys.exit(1)
print("=" * 60)
print("MIGRATION: .bak -> encrypted .zip")
print(f"Backup dir: {BACKUP_PATH}")
print("=" * 60)
migrate(BACKUP_PATH, BACKUP_PASSWORD)