#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ FIO JSON ARCHIVE → MYSQL IMPORTER (ONE-TIME REIMPORT) ===================================================== CO TOHLE DĚLÁ ------------- Tento skript je určený pro JEDNORÁZOVÝ import historických transakcí, které už máš uložené jako RAW JSON soubory na disku (archiv). ✅ NEVOLÁ Fio API (žádné stahování) ✅ Rekurzivně projde celý adresář: U:\Dropbox\!!!Days\Downloads Z230\Fio\ a najde všechny *.json soubory (typicky ve složkách podle účtu) ✅ Z každého JSONu: - vytáhne accountId (ID účtu z JSON hlavičky) - vytáhne transaction list - mapuje Fio "columnN" hodnoty na sloupce tabulky v MySQL ✅ Poté vkládá data do MySQL tabulky: fio.transactions IDEMPOTENTNÍ CHOVÁNÍ (DŮLEŽITÉ) ------------------------------ Tabulka má primární klíč: PRIMARY KEY (cislo_uctu, id_operace) Proto je INSERT napsaný jako: INSERT ... ON DUPLICATE KEY UPDATE ... To znamená: - pokud (cislo_uctu, id_operace) ještě v DB není → vloží nový řádek - pokud už tam je → neudělá duplicitu, ale může aktualizovat vybraná pole Takže skript můžeš spustit opakovaně bez rizika duplikací. CO MUSÍ EXISTOVAT ----------------- 1) MySQL DB: fio 2) Tabulka: fio.transactions (podle DDL co jsme skládali z ibd2sdi SDI) VÝKON ----- - Používá executemany() a dávky (BATCH_SIZE), aby to bylo rychlé - Commituje po dávkách POZNÁMKA K CHYBÁM ----------------- - Pokud narazí na rozbitý JSON nebo neočekávanou strukturu, soubor přeskočí a pokračuje dál (aby jednorázový import doběhl celý). """ import json from pathlib import Path import pymysql # ====================================================== # CONFIG # ====================================================== # Základní adresář s archivními JSON soubory z Fio API # Očekáváme strukturu například: # U:\Dropbox\!!!Days\Downloads Z230\Fio\2100046291_2010\2025-01-01_to_2025-03-31.json JSON_BASE_DIR = Path(r"u:\Dropbox\!!!Days\Downloads Z230\Fio") # Připojení do MySQL (pozor: port 3307 dle tvého prostředí) DB = { "host": "192.168.1.76", "port": 3306, "user": "root", "password": "Vlado9674+", "database": "fio", "charset": "utf8mb4", } # Kolik řádků posílat do DB v jedné dávce (performance) BATCH_SIZE = 500 # ====================================================== # HELPERS # ====================================================== def safe_col(tx: dict, n: int): """ Bezpečné čtení hodnoty Fio JSON "columnN". Fio JSON pro jednu transakci vypadá typicky takto: { "column0": { "name": "Datum", "value": "2025-02-14+0100" }, "column1": { "name": "Objem", "value": 123.45 }, ... } Ale: - některé columnN vůbec nemusí existovat - nebo mohou být None Tato funkce tedy: vrátí tx["columnN"]["value"] nebo: None (pokud tam columnN není) """ obj = tx.get(f"column{n}") if not obj: return None return obj.get("value") def clean_date(dt_str): """ Fio vrací datum často v podobě: "YYYY-MM-DD+0100" MySQL DATE chceme jen: "YYYY-MM-DD" Pokud je dt_str None/empty, vrátí None. """ if not dt_str: return None return str(dt_str)[:10] def as_list(maybe_list_or_dict): """ Fio někdy vrací: - seznam transakcí (list) jindy: - jedinou transakci jako dict Tady zaručíme, že vždy pracujeme se seznamem. """ if maybe_list_or_dict is None: return [] if isinstance(maybe_list_or_dict, dict): return [maybe_list_or_dict] return list(maybe_list_or_dict) # ====================================================== # SQL (MAPOVÁNÍ NA TVOU TABULKU) # ====================================================== # Vkládáme do tabulky fio.transactions (sloupce dle DDL z SDI). # Pozn.: sloupec stazeno_kdy neuvádíme -> vyplní se automaticky DEFAULT CURRENT_TIMESTAMP SQL_INSERT = """ INSERT INTO transactions ( id_operace, cislo_uctu, transaction_date, amount, currency, protiucet, kod_banky, nazev_protiuctu, nazev_banky, typ, vs, ks, ss, zprava_pro_prijemce, uziv_identifikace, provedl, id_pokynu, komentar, upr_objem_mena, api_bic, reference_platce ) VALUES ( %(id_operace)s, %(cislo_uctu)s, %(transaction_date)s, %(amount)s, %(currency)s, %(protiucet)s, %(kod_banky)s, %(nazev_protiuctu)s, %(nazev_banky)s, %(typ)s, %(vs)s, %(ks)s, %(ss)s, %(zprava_pro_prijemce)s, %(uziv_identifikace)s, %(provedl)s, %(id_pokynu)s, %(komentar)s, %(upr_objem_mena)s, %(api_bic)s, %(reference_platce)s ) ON DUPLICATE KEY UPDATE -- když už transakce existuje, aktualizujeme vybraná pole transaction_date = VALUES(transaction_date), amount = VALUES(amount), currency = VALUES(currency), protiucet = VALUES(protiucet), kod_banky = VALUES(kod_banky), nazev_protiuctu = VALUES(nazev_protiuctu), nazev_banky = VALUES(nazev_banky), typ = VALUES(typ), vs = VALUES(vs), ks = VALUES(ks), ss = VALUES(ss), zprava_pro_prijemce= VALUES(zprava_pro_prijemce), uziv_identifikace = VALUES(uziv_identifikace), provedl = VALUES(provedl), id_pokynu = VALUES(id_pokynu), komentar = VALUES(komentar), upr_objem_mena = VALUES(upr_objem_mena), api_bic = VALUES(api_bic), reference_platce = VALUES(reference_platce) """ # ====================================================== # MAIN # ====================================================== def main(): """ Hlavní běh: 1) Najde všechny *.json soubory pod JSON_BASE_DIR (rekurzivně) 2) Každý JSON načte a zkusí vytáhnout: - accountId - transactionList.transaction 3) Pro každou transakci složí dict hodnot podle SQL_INSERT 4) Vkládá do DB po dávkách (BATCH_SIZE) přes executemany() """ print("=== FIO JSON ARCHIVE → MYSQL IMPORTER ===") print(f"Base dir: {JSON_BASE_DIR}") # 1) Najdi všechny JSON soubory rekurzivně json_files = sorted(JSON_BASE_DIR.rglob("*.json")) print(f"Nalezeno JSON souborů: {len(json_files)}") if not json_files: print("Nic k importu. Končím.") return # 2) Připoj se k DB conn = pymysql.connect(**DB) cur = conn.cursor() total_rows_seen = 0 # kolik transakcí jsme celkem naparsovali (všech souborů) total_rows_sent = 0 # kolik jsme celkem poslali do DB (insert/update attempt) files_ok = 0 files_skipped = 0 try: # 3) Zpracuj každý JSON soubor for idx, jf in enumerate(json_files, start=1): print(f"\n[{idx}/{len(json_files)}] Soubor: {jf}") # 3a) načtení JSON souboru try: data = json.loads(jf.read_text(encoding="utf-8")) except Exception as e: print(f" ❌ Nejde načíst JSON ({e}) → přeskočeno") files_skipped += 1 continue # 3b) vytažení listu transakcí a accountId try: fio_acc_id = data["accountStatement"]["info"]["accountId"] t_raw = data["accountStatement"]["transactionList"].get("transaction", []) except Exception: print(" ❌ Neočekávaná struktura JSON (chybí accountStatement/info/transactionList) → přeskočeno") files_skipped += 1 continue tlist = as_list(t_raw) print(f" Účet: {fio_acc_id} | transakcí v souboru: {len(tlist)}") if not tlist: print(" (prázdný seznam transakcí) → OK, nic nevkládám") files_ok += 1 continue # 3c) naparsuj všechny transakce v souboru do listu rows rows = [] for tx in tlist: # mapování columnN -> DB sloupce (dle tvé tabulky) row = { "id_operace": safe_col(tx, 22), "cislo_uctu": fio_acc_id, "transaction_date": clean_date(safe_col(tx, 0)), "amount": safe_col(tx, 1), "currency": safe_col(tx, 14), "protiucet": safe_col(tx, 2), "kod_banky": safe_col(tx, 3), "nazev_protiuctu": safe_col(tx, 10), "nazev_banky": safe_col(tx, 15), "typ": safe_col(tx, 8), "vs": safe_col(tx, 5), "ks": safe_col(tx, 4), "ss": safe_col(tx, 6), "zprava_pro_prijemce": safe_col(tx, 16), # v SDI tabulce existuje sloupec uziv_identifikace (TEXT) # ve starém importeru se brala hodnota z column12 "uziv_identifikace": safe_col(tx, 12), # Pozor: ve tvé tabulce je "provedl" (VARCHAR(50)). # Starý importer bral "provedl" typicky z column9. "provedl": safe_col(tx, 9), "id_pokynu": safe_col(tx, 19), # textová poznámka / komentář (starý importer používal column25 jako "poznamka") "komentar": safe_col(tx, 25), # upřesnění objemu/měny (column20 / column21) # v tabulce máme jen jeden sloupec upr_objem_mena (varchar 255) # a zvlášť "currency" už je column14, takže sem dáváme column20 (Upřesnění) "upr_objem_mena": safe_col(tx, 20), # BIC a reference plátce (dle komentářů v SDI) "api_bic": safe_col(tx, 26), "reference_platce": safe_col(tx, 27), } # Minimální sanity check: # PRIMARY KEY je (cislo_uctu, id_operace), # takže pokud id_operace chybí, nebude to unikátní transakce. # Takové řádky přeskočíme (jinak by to dělalo problémy). if not row["id_operace"]: continue rows.append(row) total_rows_seen += len(tlist) print(f" Naparsováno validních řádků (s id_operace): {len(rows)}") if not rows: print(" (po filtraci nic nezbylo) → OK") files_ok += 1 continue # 3d) insert do DB po dávkách inserted_attempt = 0 for i in range(0, len(rows), BATCH_SIZE): chunk = rows[i:i + BATCH_SIZE] cur.executemany(SQL_INSERT, chunk) conn.commit() inserted_attempt += len(chunk) total_rows_sent += inserted_attempt files_ok += 1 print(f" ✓ Odesláno do DB (insert/update attempt): {inserted_attempt}") finally: cur.close() conn.close() print("\n=== HOTOVO ===") print(f"Souborů OK: {files_ok}") print(f"Souborů přeskočeno: {files_skipped}") print(f"Celkem transakcí nalezeno v JSON (raw): {total_rows_seen}") print(f"Celkem posláno do DB (insert/update attempt): {total_rows_sent}") print("Pozn.: 'posláno do DB' neznamená čisté INSERTy — část mohla být UPDATE (ON DUPLICATE KEY).") if __name__ == "__main__": main()