tw22
This commit is contained in:
@@ -22,7 +22,6 @@ DB_CONFIG = {
|
||||
}
|
||||
|
||||
BASE_DIR = Path(r"d:\Dropbox\Ordinace\Dokumentace_ke_zpracování\MP")
|
||||
# BASE_DIR = Path(r"u:\Dropbox\Ordinace\Dokumentace_ke_zpracování\MP")
|
||||
BASE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
@@ -30,23 +29,21 @@ def sanitize_name(name: str) -> str:
|
||||
"""Replace invalid filename characters with underscore."""
|
||||
return re.sub(r'[<>:"/\\|?*\x00-\x1F]', "_", name).strip()
|
||||
|
||||
|
||||
def make_abbrev(title: str) -> str:
|
||||
"""
|
||||
Create abbreviation from displayTitle:
|
||||
- First letter of each word
|
||||
- Keep leading digits together (COVID 19 → C19)
|
||||
- Remove spaces
|
||||
- Keep digits together
|
||||
- Uppercase
|
||||
"""
|
||||
if not title:
|
||||
return ""
|
||||
|
||||
# Words: letters or digits
|
||||
words = re.findall(r"[A-Za-zÁ-Žá-ž0-9]+", title)
|
||||
|
||||
abbr = ""
|
||||
for w in words:
|
||||
# If the word is pure digits → use whole word
|
||||
if w.isdigit():
|
||||
abbr += w
|
||||
else:
|
||||
@@ -54,17 +51,28 @@ def make_abbrev(title: str) -> str:
|
||||
|
||||
return abbr.upper()
|
||||
|
||||
|
||||
# ==============================
|
||||
# 🧹 DELETE UNEXPECTED FILES
|
||||
# ==============================
|
||||
def clean_folder(folder: Path, valid_files: set):
|
||||
"""Remove all files in folder that are NOT present in valid_files."""
|
||||
"""
|
||||
Remove unexpected files.
|
||||
RULE:
|
||||
- Files starting with `▲` are ALWAYS kept.
|
||||
"""
|
||||
if not folder.exists():
|
||||
return
|
||||
|
||||
for f in folder.iterdir():
|
||||
if f.is_file():
|
||||
if sanitize_name(f.name) not in valid_files:
|
||||
|
||||
# zpracované soubory (▲filename.pdf) nikdy nemažeme
|
||||
if f.name.startswith("▲"):
|
||||
continue
|
||||
|
||||
sanitized = sanitize_name(f.name)
|
||||
if sanitized not in valid_files:
|
||||
print(f"🗑️ Removing unexpected file: {f.name}")
|
||||
try:
|
||||
f.unlink()
|
||||
@@ -89,7 +97,8 @@ cur_meta.execute("""
|
||||
d.created_at,
|
||||
p.updatedAt AS req_updated_at,
|
||||
p.pacient_jmeno AS jmeno,
|
||||
p.pacient_prijmeni AS prijmeni
|
||||
p.pacient_prijmeni AS prijmeni,
|
||||
p.displayTitle
|
||||
FROM medevio_downloads d
|
||||
JOIN pozadavky p ON d.request_id = p.id
|
||||
ORDER BY p.updatedAt DESC
|
||||
@@ -98,6 +107,7 @@ cur_meta.execute("""
|
||||
rows = cur_meta.fetchall()
|
||||
print(f"📋 Found {len(rows)} attachment records.\n")
|
||||
|
||||
|
||||
# ==============================
|
||||
# 🧠 MAIN LOOP
|
||||
# ==============================
|
||||
@@ -124,55 +134,64 @@ for r in rows:
|
||||
prijmeni = sanitize_name(r["prijmeni"] or "Unknown")
|
||||
jmeno = sanitize_name(r["jmeno"] or "")
|
||||
title = r.get("displayTitle") or ""
|
||||
abbr = make_abbrev(title) # e.g. "POPC19"
|
||||
abbr = make_abbrev(title)
|
||||
|
||||
folder_name = f"{date_str} {prijmeni}, {jmeno} {abbr} {req_id}"
|
||||
folder_name = f"{date_str} {prijmeni}, {jmeno} {req_id}"
|
||||
folder_name = sanitize_name(folder_name)
|
||||
main_folder = BASE_DIR / folder_name
|
||||
clean_folder_name = sanitize_name(
|
||||
f"{date_str} {prijmeni}, {jmeno} [{abbr}] {req_id}"
|
||||
)
|
||||
|
||||
# ========== FIND OLD FOLDER (DUPLICATE) ==========
|
||||
# Any folder that contains "_<req_id>" and is not main_folder is duplicate
|
||||
# ========== DETECT EXISTING FOLDER (WITH OR WITHOUT ▲) ==========
|
||||
existing_folder = None
|
||||
folder_has_flag = False
|
||||
|
||||
for f in BASE_DIR.iterdir():
|
||||
if f.is_dir() and req_id in f.name:
|
||||
existing_folder = f
|
||||
folder_has_flag = ("▲" in f.name)
|
||||
break
|
||||
|
||||
# pokud složka existuje → pracujeme v ní
|
||||
main_folder = existing_folder if existing_folder else BASE_DIR / clean_folder_name
|
||||
|
||||
# ========== MERGE DUPLICATES ==========
|
||||
possible_dups = [
|
||||
f for f in BASE_DIR.iterdir()
|
||||
if f.is_dir() and req_id in f.name and f != main_folder
|
||||
]
|
||||
|
||||
# ========== MERGE DUPLICATES ==========
|
||||
for dup in possible_dups:
|
||||
print(f"♻️ Merging duplicate folder: {dup.name}")
|
||||
|
||||
# 1) Clean unexpected files in dup
|
||||
clean_folder(dup, valid_files)
|
||||
|
||||
# 2) Move files from dup to main folder
|
||||
main_folder.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for f in dup.iterdir():
|
||||
if f.is_file():
|
||||
# prostě přesuneme, ▲ případně zůstane v názvu
|
||||
target = main_folder / f.name
|
||||
if not target.exists():
|
||||
f.rename(target)
|
||||
|
||||
# 3) Remove the duplicate folder
|
||||
try:
|
||||
shutil.rmtree(dup, ignore_errors=True)
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not delete duplicate folder {dup}: {e}")
|
||||
shutil.rmtree(dup, ignore_errors=True)
|
||||
|
||||
# ========== CLEAN MAIN FOLDER ==========
|
||||
clean_folder(main_folder, valid_files)
|
||||
|
||||
# ========== DOWNLOAD MISSING FILES ==========
|
||||
added_new_file = False
|
||||
main_folder.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for filename in valid_files:
|
||||
dest = main_folder / filename
|
||||
if dest.exists():
|
||||
dest_plain = main_folder / filename
|
||||
dest_marked = main_folder / ("▲" + filename)
|
||||
|
||||
# soubor už existuje (buď filename, nebo ▲filename)
|
||||
if dest_plain.exists() or dest_marked.exists():
|
||||
continue
|
||||
|
||||
# fetch blob only now
|
||||
start = time.perf_counter()
|
||||
# stáhneme nový soubor → znamená že se má odstranit ▲ složky
|
||||
added_new_file = True
|
||||
|
||||
cur_blob.execute(
|
||||
"SELECT file_content FROM medevio_downloads "
|
||||
"WHERE request_id=%s AND filename=%s",
|
||||
@@ -181,17 +200,37 @@ for r in rows:
|
||||
row = cur_blob.fetchone()
|
||||
if not row:
|
||||
continue
|
||||
end = time.perf_counter()
|
||||
print(f"⏱ Took {end - start:.4f} seconds")
|
||||
|
||||
content = row[0]
|
||||
if not content:
|
||||
continue
|
||||
|
||||
with open(dest, "wb") as f:
|
||||
with open(dest_plain, "wb") as f:
|
||||
f.write(content)
|
||||
|
||||
print(f"💾 Wrote: {dest.relative_to(BASE_DIR)}")
|
||||
print(f"💾 Wrote: {dest_plain.relative_to(BASE_DIR)}")
|
||||
|
||||
# ==============================
|
||||
# 🔵 REMOVE FOLDER-LEVEL ▲ ONLY IF NEW FILE ADDED
|
||||
# ==============================
|
||||
if added_new_file:
|
||||
# složka se má přejmenovat bez ▲
|
||||
if "▲" in main_folder.name:
|
||||
new_name = main_folder.name.replace("▲", "")
|
||||
new_name = new_name.strip() # pro jistotu
|
||||
new_path = main_folder.parent / new_name
|
||||
|
||||
if new_path != main_folder:
|
||||
try:
|
||||
main_folder.rename(new_path)
|
||||
print(f"🔄 Folder flag ▲ removed → {new_name}")
|
||||
main_folder = new_path
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not rename folder: {e}")
|
||||
else:
|
||||
# žádné nové soubory → NIKDY nesahat na název složky
|
||||
pass
|
||||
|
||||
|
||||
print("\n🎯 Export complete.\n")
|
||||
|
||||
|
||||
239
Testy/20 Test.py
Normal file
239
Testy/20 Test.py
Normal file
@@ -0,0 +1,239 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import pymysql
|
||||
import re
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
import time
|
||||
|
||||
# ==============================
|
||||
# ⚙️ CONFIGURATION
|
||||
# ==============================
|
||||
DB_CONFIG = {
|
||||
"host": "192.168.1.76",
|
||||
"port": 3307,
|
||||
"user": "root",
|
||||
"password": "Vlado9674+",
|
||||
"database": "medevio",
|
||||
"charset": "utf8mb4",
|
||||
}
|
||||
|
||||
BASE_DIR = Path(r"d:\Dropbox\Ordinace\Dokumentace_ke_zpracování\MP")
|
||||
BASE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def sanitize_name(name: str) -> str:
|
||||
"""Replace invalid filename characters with underscore."""
|
||||
return re.sub(r'[<>:"/\\|?*\x00-\x1F]', "_", name).strip()
|
||||
|
||||
|
||||
def make_abbrev(title: str) -> str:
|
||||
"""
|
||||
Create abbreviation from displayTitle:
|
||||
- First letter of each word
|
||||
- Keep digits together
|
||||
- Uppercase
|
||||
"""
|
||||
if not title:
|
||||
return ""
|
||||
|
||||
words = re.findall(r"[A-Za-zÁ-Žá-ž0-9]+", title)
|
||||
|
||||
abbr = ""
|
||||
for w in words:
|
||||
if w.isdigit():
|
||||
abbr += w
|
||||
else:
|
||||
abbr += w[0]
|
||||
|
||||
return abbr.upper()
|
||||
|
||||
|
||||
# ==============================
|
||||
# 🧹 DELETE UNEXPECTED FILES
|
||||
# ==============================
|
||||
def clean_folder(folder: Path, valid_files: set):
|
||||
"""
|
||||
Remove unexpected files.
|
||||
RULE:
|
||||
- Files starting with `▲` are ALWAYS kept.
|
||||
"""
|
||||
if not folder.exists():
|
||||
return
|
||||
|
||||
for f in folder.iterdir():
|
||||
if f.is_file():
|
||||
|
||||
# zpracované soubory (▲filename.pdf) nikdy nemažeme
|
||||
if f.name.startswith("▲"):
|
||||
continue
|
||||
|
||||
sanitized = sanitize_name(f.name)
|
||||
if sanitized not in valid_files:
|
||||
print(f"🗑️ Removing unexpected file: {f.name}")
|
||||
try:
|
||||
f.unlink()
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not delete {f}: {e}")
|
||||
|
||||
|
||||
# ==============================
|
||||
# 📦 DB CONNECTION
|
||||
# ==============================
|
||||
conn = pymysql.connect(**DB_CONFIG)
|
||||
|
||||
cur_meta = conn.cursor(pymysql.cursors.DictCursor)
|
||||
cur_blob = conn.cursor()
|
||||
|
||||
print("🔍 Loading metadata from DB (FAST)…")
|
||||
|
||||
cur_meta.execute("""
|
||||
SELECT d.id AS download_id,
|
||||
d.request_id,
|
||||
d.filename,
|
||||
d.created_at,
|
||||
p.updatedAt AS req_updated_at,
|
||||
p.pacient_jmeno AS jmeno,
|
||||
p.pacient_prijmeni AS prijmeni,
|
||||
p.displayTitle
|
||||
FROM medevio_downloads d
|
||||
JOIN pozadavky p ON d.request_id = p.id
|
||||
ORDER BY p.updatedAt DESC
|
||||
""")
|
||||
|
||||
rows = cur_meta.fetchall()
|
||||
print(f"📋 Found {len(rows)} attachment records.\n")
|
||||
|
||||
|
||||
# ==============================
|
||||
# 🧠 MAIN LOOP
|
||||
# ==============================
|
||||
processed_requests = set()
|
||||
|
||||
for r in rows:
|
||||
req_id = r["request_id"]
|
||||
|
||||
if req_id in processed_requests:
|
||||
continue
|
||||
processed_requests.add(req_id)
|
||||
|
||||
# ========== FETCH ALL VALID FILES FOR THIS REQUEST ==========
|
||||
cur_meta.execute(
|
||||
"SELECT filename FROM medevio_downloads WHERE request_id=%s",
|
||||
(req_id,)
|
||||
)
|
||||
valid_files = {sanitize_name(row["filename"]) for row in cur_meta.fetchall()}
|
||||
|
||||
# ========== FOLDER NAME BASED ON UPDATEDAT ==========
|
||||
updated_at = r["req_updated_at"] or datetime.now()
|
||||
date_str = updated_at.strftime("%Y-%m-%d")
|
||||
|
||||
prijmeni = sanitize_name(r["prijmeni"] or "Unknown")
|
||||
jmeno = sanitize_name(r["jmeno"] or "")
|
||||
title = r.get("displayTitle") or ""
|
||||
abbr = make_abbrev(title)
|
||||
|
||||
clean_folder_name = sanitize_name(
|
||||
f"{date_str} {prijmeni}, {jmeno} [{abbr}] {req_id}"
|
||||
)
|
||||
|
||||
# ========== DETECT EXISTING FOLDER (WITH OR WITHOUT ▲) ==========
|
||||
existing_folder = None
|
||||
folder_has_flag = False
|
||||
|
||||
for f in BASE_DIR.iterdir():
|
||||
if f.is_dir() and req_id in f.name:
|
||||
existing_folder = f
|
||||
folder_has_flag = ("▲" in f.name)
|
||||
break
|
||||
|
||||
# pokud složka existuje → pracujeme v ní
|
||||
main_folder = existing_folder if existing_folder else BASE_DIR / clean_folder_name
|
||||
|
||||
# ========== MERGE DUPLICATES ==========
|
||||
possible_dups = [
|
||||
f for f in BASE_DIR.iterdir()
|
||||
if f.is_dir() and req_id in f.name and f != main_folder
|
||||
]
|
||||
|
||||
for dup in possible_dups:
|
||||
print(f"♻️ Merging duplicate folder: {dup.name}")
|
||||
|
||||
clean_folder(dup, valid_files)
|
||||
main_folder.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for f in dup.iterdir():
|
||||
if f.is_file():
|
||||
# prostě přesuneme, ▲ případně zůstane v názvu
|
||||
target = main_folder / f.name
|
||||
if not target.exists():
|
||||
f.rename(target)
|
||||
|
||||
shutil.rmtree(dup, ignore_errors=True)
|
||||
|
||||
# ========== CLEAN MAIN FOLDER ==========
|
||||
clean_folder(main_folder, valid_files)
|
||||
|
||||
# ========== DOWNLOAD MISSING FILES ==========
|
||||
added_new_file = False
|
||||
main_folder.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for filename in valid_files:
|
||||
dest_plain = main_folder / filename
|
||||
dest_marked = main_folder / ("▲" + filename)
|
||||
|
||||
# soubor už existuje (buď filename, nebo ▲filename)
|
||||
if dest_plain.exists() or dest_marked.exists():
|
||||
continue
|
||||
|
||||
# stáhneme nový soubor → znamená že se má odstranit ▲ složky
|
||||
added_new_file = True
|
||||
|
||||
cur_blob.execute(
|
||||
"SELECT file_content FROM medevio_downloads "
|
||||
"WHERE request_id=%s AND filename=%s",
|
||||
(req_id, filename)
|
||||
)
|
||||
row = cur_blob.fetchone()
|
||||
if not row:
|
||||
continue
|
||||
|
||||
content = row[0]
|
||||
if not content:
|
||||
continue
|
||||
|
||||
with open(dest_plain, "wb") as f:
|
||||
f.write(content)
|
||||
|
||||
print(f"💾 Wrote: {dest_plain.relative_to(BASE_DIR)}")
|
||||
|
||||
# ==============================
|
||||
# 🔵 REMOVE FOLDER-LEVEL ▲ ONLY IF NEW FILE ADDED
|
||||
# ==============================
|
||||
if added_new_file:
|
||||
# složka se má přejmenovat bez ▲
|
||||
if "▲" in main_folder.name:
|
||||
new_name = main_folder.name.replace("▲", "")
|
||||
new_name = new_name.strip() # pro jistotu
|
||||
new_path = main_folder.parent / new_name
|
||||
|
||||
if new_path != main_folder:
|
||||
try:
|
||||
main_folder.rename(new_path)
|
||||
print(f"🔄 Folder flag ▲ removed → {new_name}")
|
||||
main_folder = new_path
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not rename folder: {e}")
|
||||
else:
|
||||
# žádné nové soubory → NIKDY nesahat na název složky
|
||||
pass
|
||||
|
||||
|
||||
print("\n🎯 Export complete.\n")
|
||||
|
||||
cur_blob.close()
|
||||
cur_meta.close()
|
||||
conn.close()
|
||||
Reference in New Issue
Block a user