Merge remote-tracking branch 'origin/main'

This commit is contained in:
2025-10-21 12:43:46 +02:00

View File

@@ -5,9 +5,8 @@
Import Fio banka CSV export (UTF-8, ; separated, quoted) Import Fio banka CSV export (UTF-8, ; separated, quoted)
into MySQL database `fio.transactions`. into MySQL database `fio.transactions`.
- Datum je vždy ve formátu dd.mm.yyyy. Unique key = (Číslo účtu, ID operace, ID pokynu)
- Přidává diagnostický fingerprint do sloupce `uniq_fp` (indexovaný). Duplicates are skipped silently.
- Zatím NEblokuje duplicity; jen je snadno zobrazí pro ladění.
""" """
import csv import csv
@@ -17,7 +16,7 @@ import pymysql
from pymysql.cursors import DictCursor from pymysql.cursors import DictCursor
# ======== CONFIG ======== # ======== CONFIG ========
CSV_PATH = Path(r"u:\Dropbox\!!!Days\Downloads Z230\Vyhledane pohyby (1).csv") CSV_PATH = Path(r"u:\Dropbox\!!!Days\Downloads Z230\Vyhledane pohyby (3).csv")
TABLE_NAME = "transactions" TABLE_NAME = "transactions"
MYSQL_CONFIG = { MYSQL_CONFIG = {
@@ -31,67 +30,73 @@ MYSQL_CONFIG = {
"autocommit": True, "autocommit": True,
} }
# ---------- helpers ----------
# ======== HELPERS ========
def clean(s: str): def clean(s: str):
"""Normalize text for consistent comparison (trim + lowercase).""" """Trim and normalize text values."""
if not s: if not s:
return None return None
s = s.strip().lower() return s.strip() or None
return s or None
def build_fingerprint(data: dict) -> str:
"""Sestaví diagnostický fingerprint z normalizovaných polí."""
parts = [
data["datum"].strftime("%Y-%m-%d") if data["datum"] else "",
f"{data['objem']:.2f}" if data["objem"] is not None else "",
data.get("cislo_uctu") or "",
data.get("protiucet") or "",
data.get("kod_banky") or "",
data.get("vs") or "",
(data.get("zprava") or "")[:100],
(data.get("poznamka") or "")[:100],
]
return "|".join(parts)
# ======== DB SETUP ======== def parse_date(raw: str):
"""Convert dd.mm.yyyy → date"""
raw = (raw or "").strip()
if not raw:
return None
try:
return datetime.strptime(raw, "%d.%m.%Y").date()
except ValueError:
return None
def parse_float(raw: str):
"""Convert comma/space separated numbers to float"""
raw = (raw or "").replace(" ", "").replace(",", ".")
try:
return float(raw)
except ValueError:
return None
# ======== DB ========
def get_mysql_connection(): def get_mysql_connection():
return pymysql.connect(**MYSQL_CONFIG) return pymysql.connect(**MYSQL_CONFIG)
def ensure_table_exists(conn): def ensure_table_exists(conn):
sql_create = f""" """Create table if it doesnt exist, with unique key on (cislo_uctu, id_operace, id_pokynu)."""
sql = f"""
CREATE TABLE IF NOT EXISTS `{TABLE_NAME}` ( CREATE TABLE IF NOT EXISTS `{TABLE_NAME}` (
id INT AUTO_INCREMENT PRIMARY KEY, id INT AUTO_INCREMENT PRIMARY KEY,
datum DATE, datum DATE,
objem DECIMAL(12,2), castka DECIMAL(14,2),
mena CHAR(3), akce VARCHAR(100),
cislo_uctu VARCHAR(40), cislo_uctu VARCHAR(40),
id_operace VARCHAR(50),
id_pokynu VARCHAR(50),
protiucet VARCHAR(40), protiucet VARCHAR(40),
nazev_protiuctu VARCHAR(200),
kod_banky VARCHAR(20), kod_banky VARCHAR(20),
ks VARCHAR(20), ks VARCHAR(20),
vs VARCHAR(20), vs VARCHAR(20),
ss VARCHAR(20), ss VARCHAR(20),
zprava VARCHAR(500), zprava_pro_prijemce VARCHAR(500),
poznamka VARCHAR(500), poznamka VARCHAR(500),
uniq_fp VARCHAR(512), reference_platce VARCHAR(200),
imported_at DATETIME DEFAULT CURRENT_TIMESTAMP typ VARCHAR(100),
upresneni VARCHAR(500),
zadal VARCHAR(200),
zdrojovy_ucet VARCHAR(50),
nazev_banky VARCHAR(100),
imported_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uniq_tx (cislo_uctu, id_operace, id_pokynu)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
""" """
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute(sql_create) cur.execute(sql)
print(f"✅ Tabulka `{TABLE_NAME}` zkontrolována nebo vytvořena.")
# ✅ přidej sloupec pouze pokud chybí
cur.execute(f"SHOW COLUMNS FROM `{TABLE_NAME}` LIKE 'uniq_fp'")
if not cur.fetchone():
cur.execute(f"ALTER TABLE `{TABLE_NAME}` ADD COLUMN uniq_fp VARCHAR(512) NULL")
print("🆕 Sloupec uniq_fp přidán.")
# ✅ vytvoř index pokud ještě není
cur.execute(f"SHOW INDEX FROM `{TABLE_NAME}` WHERE Key_name = 'idx_uniq_fp'")
if not cur.fetchone():
cur.execute(f"CREATE INDEX idx_uniq_fp ON `{TABLE_NAME}` (uniq_fp)")
print("🆕 Index idx_uniq_fp vytvořen.")
print("✅ Tabulka a index zkontrolovány.")
# ======== IMPORT ======== # ======== IMPORT ========
def import_fio_csv(): def import_fio_csv():
@@ -99,71 +104,60 @@ def import_fio_csv():
reader = csv.DictReader(f, delimiter=";", quotechar='"') reader = csv.DictReader(f, delimiter=";", quotechar='"')
rows = list(reader) rows = list(reader)
total_rows = len(rows)
print(f"📄 Načteno {total_rows} řádků ze souboru {CSV_PATH.name}")
with get_mysql_connection() as conn: with get_mysql_connection() as conn:
ensure_table_exists(conn) ensure_table_exists(conn)
inserted = 0 inserted, skipped = 0, 0
for row in rows: for i, row in enumerate(rows, start=1):
# --- DATUM ---
raw_date = (row.get("Datum") or "").strip()
try:
datum = datetime.strptime(raw_date, "%d.%m.%Y").date() if raw_date else None
except ValueError:
datum = None # případně continue, pokud chceš řádek zahodit
# --- OBJEM ---
objem_str = (row.get("Objem") or "").replace(" ", "").replace(",", ".")
try:
objem = float(objem_str)
except ValueError:
objem = None
# --- normalizace textů ---
data = { data = {
"datum": datum, "datum": parse_date(row.get("Datum")),
"objem": objem, "castka": parse_float(row.get("Částka")),
"mena": clean(row.get("Měna")), "akce": clean(row.get("Akce")),
"cislo_uctu": clean(row.get("Číslo účtu")), "cislo_uctu": clean(row.get("Číslo účtu")),
"id_operace": clean(row.get("ID operace")),
"id_pokynu": clean(row.get("ID pokynu")),
"protiucet": clean(row.get("Protiúčet")), "protiucet": clean(row.get("Protiúčet")),
"nazev_protiuctu": clean(row.get("Název protiúčtu")),
"kod_banky": clean(row.get("Kód banky")), "kod_banky": clean(row.get("Kód banky")),
"ks": clean(row.get("KS")), "ks": clean(row.get("KS")),
"vs": clean(row.get("VS")), "vs": clean(row.get("VS")),
"ss": clean(row.get("SS")), "ss": clean(row.get("SS")),
"zprava": clean(row.get("Zpráva pro příjemce")), "zprava_pro_prijemce": clean(row.get("Zpráva pro příjemce")),
"poznamka": clean(row.get("Poznámka")), "poznamka": clean(row.get("Poznámka")),
"reference_platce": clean(row.get("Reference plátce")),
"typ": clean(row.get("Typ")),
"upresneni": clean(row.get("Upřesnění")),
"zadal": clean(row.get("Zadal")),
"zdrojovy_ucet": clean(row.get("Zdrojový účet")),
"nazev_banky": clean(row.get("Název banky")),
} }
# --- fingerprint pro ladění duplicit --- cols = ", ".join(data.keys())
data["uniq_fp"] = build_fingerprint(data)
# --- INSERT (záměrně bez IGNORE, ať duplicitní řádky opravdu uvidíme) ---
placeholders = ", ".join(["%s"] * len(data)) placeholders = ", ".join(["%s"] * len(data))
sql = f"INSERT INTO `{TABLE_NAME}` ({', '.join(data.keys())}) VALUES ({placeholders})" sql = f"INSERT IGNORE INTO `{TABLE_NAME}` ({cols}) VALUES ({placeholders})"
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute(sql, list(data.values())) affected = cur.execute(sql, list(data.values()))
inserted += 1 if affected:
inserted += 1
else:
skipped += 1
print(f"✅ Import hotový: vloženo {inserted} řádků.") # --- progress output ---
if i % 500 == 0 or i == total_rows:
print(f" {i}/{total_rows} zpracováno... ({inserted} vloženo, {skipped} duplicit)")
# ----- rychlý report duplicit podle fingerprintu ----- # summary
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute(f""" cur.execute(f"SELECT COUNT(*) AS cnt FROM `{TABLE_NAME}`")
SELECT uniq_fp, COUNT(*) AS c total_db = cur.fetchone()["cnt"]
FROM `{TABLE_NAME}`
GROUP BY uniq_fp print(f"\n✅ Import dokončen: {inserted} nových, {skipped} duplicit přeskočeno.")
HAVING c > 1 print(f"📊 Celkem v databázi: {total_db} záznamů.")
ORDER BY c DESC
LIMIT 10;
""")
dups = cur.fetchall()
if dups:
print("\n⚠️ TOP 10 duplicitních fingerprintů (uniq_fp, count):")
for r in dups:
print(f" {r['uniq_fp']} -> {r['c']}")
print("\n💡 Tip: SELECT * FROM `transactions` WHERE uniq_fp = '...';")
else:
print("✅ Žádné duplicity podle uniq_fp nenalezeny.")
# ======== MAIN ======== # ======== MAIN ========
if __name__ == "__main__": if __name__ == "__main__":