z230
This commit is contained in:
+1
-1
File diff suppressed because one or more lines are too long
BIN
Binary file not shown.
Binary file not shown.
+158
-21
@@ -1,7 +1,11 @@
|
|||||||
"""
|
"""
|
||||||
Import Panorama Issues & Deviations XLSX do MongoDB (databáze: Panorama).
|
Import Panorama XLSX reportů do MongoDB (databáze: Panorama).
|
||||||
|
|
||||||
|
Podporované typy:
|
||||||
|
- Issues & Deviations → kolekce IssuesAndDeviations (klíč: ID / fuzzy+hash)
|
||||||
|
- Site Visit Details → kolekce Visits (klíč: Site Visit ID (Technical))
|
||||||
|
- FUL details → kolekce FUL (klíč: SVR Document Number)
|
||||||
|
|
||||||
Kolekce: IssuesAndDeviations
|
|
||||||
Filtr: pouze řádky s Country Name == "Czechia"
|
Filtr: pouze řádky s Country Name == "Czechia"
|
||||||
Historie: při změně fields se stará verze uloží do pole history[]
|
Historie: při změně fields se stará verze uloží do pole history[]
|
||||||
|
|
||||||
@@ -10,6 +14,7 @@ Použití:
|
|||||||
python import_to_mongo.py Downloads/konkretni.xlsx # jeden soubor
|
python import_to_mongo.py Downloads/konkretni.xlsx # jeden soubor
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
import sys
|
import sys
|
||||||
@@ -18,17 +23,51 @@ from pathlib import Path
|
|||||||
|
|
||||||
import openpyxl
|
import openpyxl
|
||||||
from pymongo import MongoClient, ASCENDING
|
from pymongo import MongoClient, ASCENDING
|
||||||
|
from rapidfuzz import fuzz
|
||||||
|
|
||||||
|
FUZZY_FIELDS = ("Description", "Comments", "Action Taken")
|
||||||
|
FUZZY_MIN_FIELDS = 2 # počet polí, která musí dosáhnout prahu
|
||||||
|
FUZZY_THRESHOLD = 90.0 # % shoda pro pole v 2-of-3 logice
|
||||||
|
FALLBACK_DESC_THRESHOLD = 95.0 # % shoda Description, když chybí druhé pole
|
||||||
|
|
||||||
MONGO_URI = "mongodb://192.168.1.76:27017"
|
MONGO_URI = "mongodb://192.168.1.76:27017"
|
||||||
DB_NAME = "Panorama"
|
DB_NAME = "Panorama"
|
||||||
COLLECTION_NAME = "IssuesAndDeviations"
|
|
||||||
DOWNLOADS_DIR = Path(__file__).parent / "Downloads"
|
DOWNLOADS_DIR = Path(__file__).parent / "Downloads"
|
||||||
PROCESSED_DIR = DOWNLOADS_DIR / "Zpracovano"
|
PROCESSED_DIR = DOWNLOADS_DIR / "Zpracovano"
|
||||||
|
|
||||||
COUNTRY_FILTER = None # None = všechny země
|
COUNTRY_FILTER = "Czechia" # None = všechny země
|
||||||
HEADER_ROW = 5 # 0-indexed řádek s hlavičkou
|
HEADER_ROW = 5 # 0-indexed řádek s hlavičkou
|
||||||
DATA_START_ROW = 6 # 0-indexed první datový řádek
|
DATA_START_ROW = 6 # 0-indexed první datový řádek
|
||||||
UPSERT_KEY = "ID" # unikátní klíč pro upsert
|
|
||||||
|
REPORT_TYPES = {
|
||||||
|
"IssuesAndDeviations": {
|
||||||
|
"pattern": re.compile(r"Deviations and Issues\.xlsx$", re.IGNORECASE),
|
||||||
|
"collection": "IssuesAndDeviations",
|
||||||
|
"upsert_key": "ID",
|
||||||
|
"indexes": [
|
||||||
|
"fields.Country Name", "fields.Site ID", "fields.Status",
|
||||||
|
"fields.Brief Description - Subject ID",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
"Visits": {
|
||||||
|
"pattern": re.compile(r"Site Visit Details\.xlsx$", re.IGNORECASE),
|
||||||
|
"collection": "Visits",
|
||||||
|
"upsert_key": "Site Visit ID (Technical)",
|
||||||
|
"indexes": [
|
||||||
|
"fields.Country Name", "fields.Site ID",
|
||||||
|
"fields.Site Visit Status", "fields.Site Visit Type",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
"FUL": {
|
||||||
|
"pattern": re.compile(r"FUL details\.xlsx$", re.IGNORECASE),
|
||||||
|
"collection": "FUL",
|
||||||
|
"upsert_key": "SVR Document Number",
|
||||||
|
"indexes": [
|
||||||
|
"fields.Country Name", "fields.Site ID",
|
||||||
|
"fields.FUL Missing?", "fields.FUL Document Status",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -40,6 +79,47 @@ def extract_snapshot_date(filename: str) -> str:
|
|||||||
return match.group(1) if match else datetime.now().strftime("%Y-%m-%d")
|
return match.group(1) if match else datetime.now().strftime("%Y-%m-%d")
|
||||||
|
|
||||||
|
|
||||||
|
def fuzzy_match_existing(collection, fields: dict):
|
||||||
|
"""Najde existující záznam pomocí fuzzy match nad Description/Comments/Action Taken.
|
||||||
|
Vrací existující dokument nebo None.
|
||||||
|
"""
|
||||||
|
candidates = collection.find({
|
||||||
|
"fields.Part": fields.get("Part"),
|
||||||
|
"fields.Protocol ID": fields.get("Protocol ID"),
|
||||||
|
"fields.Site ID": fields.get("Site ID"),
|
||||||
|
"fields.Create Date": fields.get("Create Date"),
|
||||||
|
"fields.Brief Description - Subject ID": fields.get("Brief Description - Subject ID"),
|
||||||
|
"fields.ID": None,
|
||||||
|
})
|
||||||
|
|
||||||
|
new_vals = {f: (fields.get(f) or "").strip() for f in FUZZY_FIELDS}
|
||||||
|
|
||||||
|
for cand in candidates:
|
||||||
|
cand_fields = cand.get("fields", {})
|
||||||
|
cand_vals = {f: (cand_fields.get(f) or "").strip() for f in FUZZY_FIELDS}
|
||||||
|
|
||||||
|
scores = {}
|
||||||
|
for f in FUZZY_FIELDS:
|
||||||
|
if new_vals[f] and cand_vals[f]:
|
||||||
|
scores[f] = fuzz.ratio(new_vals[f], cand_vals[f])
|
||||||
|
|
||||||
|
passing = [f for f, s in scores.items() if s >= FUZZY_THRESHOLD]
|
||||||
|
|
||||||
|
if len(scores) >= FUZZY_MIN_FIELDS and len(passing) >= FUZZY_MIN_FIELDS:
|
||||||
|
return cand
|
||||||
|
if len(scores) < FUZZY_MIN_FIELDS and "Description" in scores and scores["Description"] >= FALLBACK_DESC_THRESHOLD:
|
||||||
|
return cand
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def detect_report_type(filename: str) -> dict | None:
|
||||||
|
for cfg in REPORT_TYPES.values():
|
||||||
|
if cfg["pattern"].search(filename):
|
||||||
|
return cfg
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def clean_value(val):
|
def clean_value(val):
|
||||||
"""Převede datetime na ISO string, None nechá, zbytek strip."""
|
"""Převede datetime na ISO string, None nechá, zbytek strip."""
|
||||||
if val is None:
|
if val is None:
|
||||||
@@ -58,9 +138,12 @@ def clean_value(val):
|
|||||||
# Import jednoho souboru
|
# Import jednoho souboru
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
def import_file(xlsx_path: str, collection) -> dict:
|
def import_file(xlsx_path: str, collection, report_cfg: dict) -> dict:
|
||||||
filename = Path(xlsx_path).name
|
filename = Path(xlsx_path).name
|
||||||
snapshot_date = extract_snapshot_date(filename)
|
snapshot_date = extract_snapshot_date(filename)
|
||||||
|
upsert_key = report_cfg["upsert_key"]
|
||||||
|
collection_name = report_cfg["collection"]
|
||||||
|
use_fuzzy = (collection_name == "IssuesAndDeviations")
|
||||||
|
|
||||||
wb = openpyxl.load_workbook(xlsx_path, read_only=True)
|
wb = openpyxl.load_workbook(xlsx_path, read_only=True)
|
||||||
ws = wb[wb.sheetnames[0]]
|
ws = wb[wb.sheetnames[0]]
|
||||||
@@ -69,9 +152,12 @@ def import_file(xlsx_path: str, collection) -> dict:
|
|||||||
|
|
||||||
header = rows[HEADER_ROW]
|
header = rows[HEADER_ROW]
|
||||||
|
|
||||||
|
data_rows = [r for r in rows[DATA_START_ROW:] if any(v is not None for v in r)]
|
||||||
|
xlsx_count = len(data_rows)
|
||||||
|
|
||||||
inserted = changed = unchanged = filtered_out = 0
|
inserted = changed = unchanged = filtered_out = 0
|
||||||
|
|
||||||
for row in rows[DATA_START_ROW:]:
|
for row in data_rows:
|
||||||
raw = dict(zip(header, row))
|
raw = dict(zip(header, row))
|
||||||
|
|
||||||
country = (raw.get("Country Name") or "")
|
country = (raw.get("Country Name") or "")
|
||||||
@@ -79,18 +165,37 @@ def import_file(xlsx_path: str, collection) -> dict:
|
|||||||
filtered_out += 1
|
filtered_out += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
record_id = raw.get(UPSERT_KEY)
|
|
||||||
if record_id is None:
|
|
||||||
continue
|
|
||||||
record_id = str(int(record_id)) if isinstance(record_id, (int, float)) else str(record_id).strip()
|
|
||||||
|
|
||||||
fields = {}
|
fields = {}
|
||||||
for k, v in raw.items():
|
for k, v in raw.items():
|
||||||
if k is None:
|
if k is None:
|
||||||
continue
|
continue
|
||||||
fields[k] = clean_value(v)
|
fields[k] = clean_value(v)
|
||||||
|
|
||||||
|
record_id = raw.get(upsert_key)
|
||||||
|
has_id = record_id is not None
|
||||||
|
|
||||||
|
if has_id:
|
||||||
|
record_id = str(int(record_id)) if isinstance(record_id, (int, float)) else str(record_id).strip()
|
||||||
existing = collection.find_one({"record_id": record_id})
|
existing = collection.find_one({"record_id": record_id})
|
||||||
|
elif use_fuzzy:
|
||||||
|
existing = fuzzy_match_existing(collection, fields)
|
||||||
|
if existing is not None:
|
||||||
|
record_id = existing["record_id"]
|
||||||
|
else:
|
||||||
|
key_parts = [
|
||||||
|
str(raw.get("Part") or ""),
|
||||||
|
str(raw.get("Site ID") or ""),
|
||||||
|
str(raw.get("Create Date") or ""),
|
||||||
|
str(raw.get("Description") or ""),
|
||||||
|
str(raw.get("Brief Description - Subject ID") or ""),
|
||||||
|
str(raw.get("Comments") or ""),
|
||||||
|
str(raw.get("Action Taken") or ""),
|
||||||
|
]
|
||||||
|
h = hashlib.sha1("|".join(key_parts).encode("utf-8")).hexdigest()[:16]
|
||||||
|
record_id = f"H-{h}"
|
||||||
|
else:
|
||||||
|
filtered_out += 1
|
||||||
|
continue
|
||||||
|
|
||||||
if existing is None:
|
if existing is None:
|
||||||
doc = {
|
doc = {
|
||||||
@@ -129,14 +234,35 @@ def import_file(xlsx_path: str, collection) -> dict:
|
|||||||
)
|
)
|
||||||
unchanged += 1
|
unchanged += 1
|
||||||
|
|
||||||
|
processed = inserted + changed + unchanged + filtered_out
|
||||||
|
protocol_id = None
|
||||||
|
for row in data_rows[:50]:
|
||||||
|
raw = dict(zip(header, row))
|
||||||
|
pid = raw.get("Protocol ID")
|
||||||
|
if pid:
|
||||||
|
protocol_id = str(pid).strip()
|
||||||
|
break
|
||||||
|
db_count = collection.count_documents({"fields.Protocol ID": protocol_id}) if protocol_id else None
|
||||||
|
|
||||||
stats = {
|
stats = {
|
||||||
"snapshot": snapshot_date,
|
"snapshot": snapshot_date,
|
||||||
"inserted": inserted,
|
"inserted": inserted,
|
||||||
"changed": changed,
|
"changed": changed,
|
||||||
"unchanged": unchanged,
|
"unchanged": unchanged,
|
||||||
"filtered_out": filtered_out,
|
"filtered_out": filtered_out,
|
||||||
|
"xlsx_count": xlsx_count,
|
||||||
|
"db_count": db_count,
|
||||||
|
"protocol_id": protocol_id,
|
||||||
}
|
}
|
||||||
print(f" {COLLECTION_NAME} [{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{filtered_out} non-CZ")
|
print(f" {collection_name} [{snapshot_date}]: +{inserted} new, ~{changed} changed, ={unchanged} same, -{filtered_out} non-CZ")
|
||||||
|
|
||||||
|
expected_in_db = xlsx_count - filtered_out
|
||||||
|
|
||||||
|
if processed != xlsx_count:
|
||||||
|
print(f" !!! VAROVANI: zpracovano {processed} radku, ale v XLSX je {xlsx_count} datovych radku")
|
||||||
|
if db_count is not None and db_count != expected_in_db:
|
||||||
|
print(f" !!! VAROVANI: v DB je {db_count} dokumentu pro Protocol ID {protocol_id}, ocekavano {expected_in_db} (XLSX {xlsx_count} - filtered {filtered_out})")
|
||||||
|
|
||||||
return stats
|
return stats
|
||||||
|
|
||||||
|
|
||||||
@@ -166,21 +292,32 @@ def main():
|
|||||||
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
|
||||||
client.admin.command("ping")
|
client.admin.command("ping")
|
||||||
db = client[DB_NAME]
|
db = client[DB_NAME]
|
||||||
collection = db[COLLECTION_NAME]
|
|
||||||
|
|
||||||
collection.create_index([("record_id", ASCENDING)], unique=True)
|
collections_cache: dict[str, any] = {}
|
||||||
collection.create_index([("fields.Country Name", ASCENDING)])
|
|
||||||
collection.create_index([("fields.Site ID", ASCENDING)])
|
def get_collection(cfg: dict):
|
||||||
collection.create_index([("fields.Status", ASCENDING)])
|
name = cfg["collection"]
|
||||||
collection.create_index([("fields.Brief Description - Subject ID", ASCENDING)])
|
if name not in collections_cache:
|
||||||
|
col = db[name]
|
||||||
|
col.create_index([("record_id", ASCENDING)], unique=True)
|
||||||
|
for idx_field in cfg["indexes"]:
|
||||||
|
col.create_index([(idx_field, ASCENDING)])
|
||||||
|
collections_cache[name] = col
|
||||||
|
return collections_cache[name]
|
||||||
|
|
||||||
PROCESSED_DIR.mkdir(exist_ok=True)
|
PROCESSED_DIR.mkdir(exist_ok=True)
|
||||||
|
|
||||||
total = {"inserted": 0, "changed": 0, "unchanged": 0}
|
total = {"inserted": 0, "changed": 0, "unchanged": 0}
|
||||||
|
|
||||||
for xlsx_path in paths:
|
for xlsx_path in paths:
|
||||||
print(f"Import: {xlsx_path.name}")
|
report_cfg = detect_report_type(xlsx_path.name)
|
||||||
stats = import_file(str(xlsx_path), collection)
|
if report_cfg is None:
|
||||||
|
print(f"PRESKAKUJI (neznamy typ): {xlsx_path.name}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
collection = get_collection(report_cfg)
|
||||||
|
print(f"Import: {xlsx_path.name} -> {report_cfg['collection']}")
|
||||||
|
stats = import_file(str(xlsx_path), collection, report_cfg)
|
||||||
for k in total:
|
for k in total:
|
||||||
total[k] += stats.get(k, 0)
|
total[k] += stats.get(k, 0)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user