notebook
This commit is contained in:
@@ -0,0 +1,382 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
"""
|
||||
FIO JSON ARCHIVE → MYSQL IMPORTER (ONE-TIME REIMPORT)
|
||||
=====================================================
|
||||
|
||||
CO TOHLE DĚLÁ
|
||||
-------------
|
||||
Tento skript je určený pro JEDNORÁZOVÝ import historických transakcí,
|
||||
které už máš uložené jako RAW JSON soubory na disku (archiv).
|
||||
|
||||
✅ NEVOLÁ Fio API (žádné stahování)
|
||||
✅ Rekurzivně projde celý adresář:
|
||||
U:\Dropbox\!!!Days\Downloads Z230\Fio\
|
||||
a najde všechny *.json soubory (typicky ve složkách podle účtu)
|
||||
|
||||
✅ Z každého JSONu:
|
||||
- vytáhne accountId (ID účtu z JSON hlavičky)
|
||||
- vytáhne transaction list
|
||||
- mapuje Fio "columnN" hodnoty na sloupce tabulky v MySQL
|
||||
|
||||
✅ Poté vkládá data do MySQL tabulky:
|
||||
fio.transactions
|
||||
|
||||
IDEMPOTENTNÍ CHOVÁNÍ (DŮLEŽITÉ)
|
||||
------------------------------
|
||||
Tabulka má primární klíč:
|
||||
PRIMARY KEY (cislo_uctu, id_operace)
|
||||
|
||||
Proto je INSERT napsaný jako:
|
||||
INSERT ... ON DUPLICATE KEY UPDATE ...
|
||||
|
||||
To znamená:
|
||||
- pokud (cislo_uctu, id_operace) ještě v DB není → vloží nový řádek
|
||||
- pokud už tam je → neudělá duplicitu, ale může aktualizovat vybraná pole
|
||||
|
||||
Takže skript můžeš spustit opakovaně bez rizika duplikací.
|
||||
|
||||
CO MUSÍ EXISTOVAT
|
||||
-----------------
|
||||
1) MySQL DB: fio
|
||||
2) Tabulka: fio.transactions (podle DDL co jsme skládali z ibd2sdi SDI)
|
||||
|
||||
VÝKON
|
||||
-----
|
||||
- Používá executemany() a dávky (BATCH_SIZE), aby to bylo rychlé
|
||||
- Commituje po dávkách
|
||||
|
||||
POZNÁMKA K CHYBÁM
|
||||
-----------------
|
||||
- Pokud narazí na rozbitý JSON nebo neočekávanou strukturu, soubor přeskočí
|
||||
a pokračuje dál (aby jednorázový import doběhl celý).
|
||||
|
||||
"""
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import pymysql
|
||||
|
||||
|
||||
# ======================================================
|
||||
# CONFIG
|
||||
# ======================================================
|
||||
|
||||
# Základní adresář s archivními JSON soubory z Fio API
|
||||
# Očekáváme strukturu například:
|
||||
# U:\Dropbox\!!!Days\Downloads Z230\Fio\2100046291_2010\2025-01-01_to_2025-03-31.json
|
||||
JSON_BASE_DIR = Path(r"u:\Dropbox\!!!Days\Downloads Z230\Fio")
|
||||
|
||||
# Připojení do MySQL (pozor: port 3307 dle tvého prostředí)
|
||||
DB = {
|
||||
"host": "192.168.1.76",
|
||||
"port": 3306,
|
||||
"user": "root",
|
||||
"password": "Vlado9674+",
|
||||
"database": "fio",
|
||||
"charset": "utf8mb4",
|
||||
}
|
||||
|
||||
# Kolik řádků posílat do DB v jedné dávce (performance)
|
||||
BATCH_SIZE = 500
|
||||
|
||||
|
||||
# ======================================================
|
||||
# HELPERS
|
||||
# ======================================================
|
||||
|
||||
def safe_col(tx: dict, n: int):
|
||||
"""
|
||||
Bezpečné čtení hodnoty Fio JSON "columnN".
|
||||
|
||||
Fio JSON pro jednu transakci vypadá typicky takto:
|
||||
{
|
||||
"column0": { "name": "Datum", "value": "2025-02-14+0100" },
|
||||
"column1": { "name": "Objem", "value": 123.45 },
|
||||
...
|
||||
}
|
||||
|
||||
Ale:
|
||||
- některé columnN vůbec nemusí existovat
|
||||
- nebo mohou být None
|
||||
|
||||
Tato funkce tedy:
|
||||
vrátí tx["columnN"]["value"]
|
||||
nebo:
|
||||
None (pokud tam columnN není)
|
||||
"""
|
||||
obj = tx.get(f"column{n}")
|
||||
if not obj:
|
||||
return None
|
||||
return obj.get("value")
|
||||
|
||||
|
||||
def clean_date(dt_str):
|
||||
"""
|
||||
Fio vrací datum často v podobě:
|
||||
"YYYY-MM-DD+0100"
|
||||
MySQL DATE chceme jen:
|
||||
"YYYY-MM-DD"
|
||||
|
||||
Pokud je dt_str None/empty, vrátí None.
|
||||
"""
|
||||
if not dt_str:
|
||||
return None
|
||||
return str(dt_str)[:10]
|
||||
|
||||
|
||||
def as_list(maybe_list_or_dict):
|
||||
"""
|
||||
Fio někdy vrací:
|
||||
- seznam transakcí (list)
|
||||
jindy:
|
||||
- jedinou transakci jako dict
|
||||
|
||||
Tady zaručíme, že vždy pracujeme se seznamem.
|
||||
"""
|
||||
if maybe_list_or_dict is None:
|
||||
return []
|
||||
if isinstance(maybe_list_or_dict, dict):
|
||||
return [maybe_list_or_dict]
|
||||
return list(maybe_list_or_dict)
|
||||
|
||||
|
||||
# ======================================================
|
||||
# SQL (MAPOVÁNÍ NA TVOU TABULKU)
|
||||
# ======================================================
|
||||
|
||||
# Vkládáme do tabulky fio.transactions (sloupce dle DDL z SDI).
|
||||
# Pozn.: sloupec stazeno_kdy neuvádíme -> vyplní se automaticky DEFAULT CURRENT_TIMESTAMP
|
||||
SQL_INSERT = """
|
||||
INSERT INTO transactions
|
||||
(
|
||||
id_operace,
|
||||
cislo_uctu,
|
||||
transaction_date,
|
||||
amount,
|
||||
currency,
|
||||
protiucet,
|
||||
kod_banky,
|
||||
nazev_protiuctu,
|
||||
nazev_banky,
|
||||
typ,
|
||||
vs,
|
||||
ks,
|
||||
ss,
|
||||
zprava_pro_prijemce,
|
||||
uziv_identifikace,
|
||||
provedl,
|
||||
id_pokynu,
|
||||
komentar,
|
||||
upr_objem_mena,
|
||||
api_bic,
|
||||
reference_platce
|
||||
)
|
||||
VALUES
|
||||
(
|
||||
%(id_operace)s,
|
||||
%(cislo_uctu)s,
|
||||
%(transaction_date)s,
|
||||
%(amount)s,
|
||||
%(currency)s,
|
||||
%(protiucet)s,
|
||||
%(kod_banky)s,
|
||||
%(nazev_protiuctu)s,
|
||||
%(nazev_banky)s,
|
||||
%(typ)s,
|
||||
%(vs)s,
|
||||
%(ks)s,
|
||||
%(ss)s,
|
||||
%(zprava_pro_prijemce)s,
|
||||
%(uziv_identifikace)s,
|
||||
%(provedl)s,
|
||||
%(id_pokynu)s,
|
||||
%(komentar)s,
|
||||
%(upr_objem_mena)s,
|
||||
%(api_bic)s,
|
||||
%(reference_platce)s
|
||||
)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
-- když už transakce existuje, aktualizujeme vybraná pole
|
||||
transaction_date = VALUES(transaction_date),
|
||||
amount = VALUES(amount),
|
||||
currency = VALUES(currency),
|
||||
protiucet = VALUES(protiucet),
|
||||
kod_banky = VALUES(kod_banky),
|
||||
nazev_protiuctu = VALUES(nazev_protiuctu),
|
||||
nazev_banky = VALUES(nazev_banky),
|
||||
typ = VALUES(typ),
|
||||
vs = VALUES(vs),
|
||||
ks = VALUES(ks),
|
||||
ss = VALUES(ss),
|
||||
zprava_pro_prijemce= VALUES(zprava_pro_prijemce),
|
||||
uziv_identifikace = VALUES(uziv_identifikace),
|
||||
provedl = VALUES(provedl),
|
||||
id_pokynu = VALUES(id_pokynu),
|
||||
komentar = VALUES(komentar),
|
||||
upr_objem_mena = VALUES(upr_objem_mena),
|
||||
api_bic = VALUES(api_bic),
|
||||
reference_platce = VALUES(reference_platce)
|
||||
"""
|
||||
|
||||
|
||||
# ======================================================
|
||||
# MAIN
|
||||
# ======================================================
|
||||
|
||||
def main():
|
||||
"""
|
||||
Hlavní běh:
|
||||
|
||||
1) Najde všechny *.json soubory pod JSON_BASE_DIR (rekurzivně)
|
||||
2) Každý JSON načte a zkusí vytáhnout:
|
||||
- accountId
|
||||
- transactionList.transaction
|
||||
3) Pro každou transakci složí dict hodnot podle SQL_INSERT
|
||||
4) Vkládá do DB po dávkách (BATCH_SIZE) přes executemany()
|
||||
"""
|
||||
|
||||
print("=== FIO JSON ARCHIVE → MYSQL IMPORTER ===")
|
||||
print(f"Base dir: {JSON_BASE_DIR}")
|
||||
|
||||
# 1) Najdi všechny JSON soubory rekurzivně
|
||||
json_files = sorted(JSON_BASE_DIR.rglob("*.json"))
|
||||
print(f"Nalezeno JSON souborů: {len(json_files)}")
|
||||
|
||||
if not json_files:
|
||||
print("Nic k importu. Končím.")
|
||||
return
|
||||
|
||||
# 2) Připoj se k DB
|
||||
conn = pymysql.connect(**DB)
|
||||
cur = conn.cursor()
|
||||
|
||||
total_rows_seen = 0 # kolik transakcí jsme celkem naparsovali (všech souborů)
|
||||
total_rows_sent = 0 # kolik jsme celkem poslali do DB (insert/update attempt)
|
||||
files_ok = 0
|
||||
files_skipped = 0
|
||||
|
||||
try:
|
||||
# 3) Zpracuj každý JSON soubor
|
||||
for idx, jf in enumerate(json_files, start=1):
|
||||
print(f"\n[{idx}/{len(json_files)}] Soubor: {jf}")
|
||||
|
||||
# 3a) načtení JSON souboru
|
||||
try:
|
||||
data = json.loads(jf.read_text(encoding="utf-8"))
|
||||
except Exception as e:
|
||||
print(f" ❌ Nejde načíst JSON ({e}) → přeskočeno")
|
||||
files_skipped += 1
|
||||
continue
|
||||
|
||||
# 3b) vytažení listu transakcí a accountId
|
||||
try:
|
||||
fio_acc_id = data["accountStatement"]["info"]["accountId"]
|
||||
t_raw = data["accountStatement"]["transactionList"].get("transaction", [])
|
||||
except Exception:
|
||||
print(" ❌ Neočekávaná struktura JSON (chybí accountStatement/info/transactionList) → přeskočeno")
|
||||
files_skipped += 1
|
||||
continue
|
||||
|
||||
tlist = as_list(t_raw)
|
||||
print(f" Účet: {fio_acc_id} | transakcí v souboru: {len(tlist)}")
|
||||
|
||||
if not tlist:
|
||||
print(" (prázdný seznam transakcí) → OK, nic nevkládám")
|
||||
files_ok += 1
|
||||
continue
|
||||
|
||||
# 3c) naparsuj všechny transakce v souboru do listu rows
|
||||
rows = []
|
||||
for tx in tlist:
|
||||
# mapování columnN -> DB sloupce (dle tvé tabulky)
|
||||
row = {
|
||||
"id_operace": safe_col(tx, 22),
|
||||
"cislo_uctu": fio_acc_id,
|
||||
|
||||
"transaction_date": clean_date(safe_col(tx, 0)),
|
||||
"amount": safe_col(tx, 1),
|
||||
"currency": safe_col(tx, 14),
|
||||
|
||||
"protiucet": safe_col(tx, 2),
|
||||
"kod_banky": safe_col(tx, 3),
|
||||
|
||||
"nazev_protiuctu": safe_col(tx, 10),
|
||||
"nazev_banky": safe_col(tx, 15),
|
||||
|
||||
"typ": safe_col(tx, 8),
|
||||
|
||||
"vs": safe_col(tx, 5),
|
||||
"ks": safe_col(tx, 4),
|
||||
"ss": safe_col(tx, 6),
|
||||
|
||||
"zprava_pro_prijemce": safe_col(tx, 16),
|
||||
|
||||
# v SDI tabulce existuje sloupec uziv_identifikace (TEXT)
|
||||
# ve starém importeru se brala hodnota z column12
|
||||
"uziv_identifikace": safe_col(tx, 12),
|
||||
|
||||
# Pozor: ve tvé tabulce je "provedl" (VARCHAR(50)).
|
||||
# Starý importer bral "provedl" typicky z column9.
|
||||
"provedl": safe_col(tx, 9),
|
||||
|
||||
"id_pokynu": safe_col(tx, 19),
|
||||
|
||||
# textová poznámka / komentář (starý importer používal column25 jako "poznamka")
|
||||
"komentar": safe_col(tx, 25),
|
||||
|
||||
# upřesnění objemu/měny (column20 / column21)
|
||||
# v tabulce máme jen jeden sloupec upr_objem_mena (varchar 255)
|
||||
# a zvlášť "currency" už je column14, takže sem dáváme column20 (Upřesnění)
|
||||
"upr_objem_mena": safe_col(tx, 20),
|
||||
|
||||
# BIC a reference plátce (dle komentářů v SDI)
|
||||
"api_bic": safe_col(tx, 26),
|
||||
"reference_platce": safe_col(tx, 27),
|
||||
}
|
||||
|
||||
# Minimální sanity check:
|
||||
# PRIMARY KEY je (cislo_uctu, id_operace),
|
||||
# takže pokud id_operace chybí, nebude to unikátní transakce.
|
||||
# Takové řádky přeskočíme (jinak by to dělalo problémy).
|
||||
if not row["id_operace"]:
|
||||
continue
|
||||
|
||||
rows.append(row)
|
||||
|
||||
total_rows_seen += len(tlist)
|
||||
print(f" Naparsováno validních řádků (s id_operace): {len(rows)}")
|
||||
|
||||
if not rows:
|
||||
print(" (po filtraci nic nezbylo) → OK")
|
||||
files_ok += 1
|
||||
continue
|
||||
|
||||
# 3d) insert do DB po dávkách
|
||||
inserted_attempt = 0
|
||||
for i in range(0, len(rows), BATCH_SIZE):
|
||||
chunk = rows[i:i + BATCH_SIZE]
|
||||
cur.executemany(SQL_INSERT, chunk)
|
||||
conn.commit()
|
||||
inserted_attempt += len(chunk)
|
||||
|
||||
total_rows_sent += inserted_attempt
|
||||
files_ok += 1
|
||||
print(f" ✓ Odesláno do DB (insert/update attempt): {inserted_attempt}")
|
||||
|
||||
finally:
|
||||
cur.close()
|
||||
conn.close()
|
||||
|
||||
print("\n=== HOTOVO ===")
|
||||
print(f"Souborů OK: {files_ok}")
|
||||
print(f"Souborů přeskočeno: {files_skipped}")
|
||||
print(f"Celkem transakcí nalezeno v JSON (raw): {total_rows_seen}")
|
||||
print(f"Celkem posláno do DB (insert/update attempt): {total_rows_sent}")
|
||||
print("Pozn.: 'posláno do DB' neznamená čisté INSERTy — část mohla být UPDATE (ON DUPLICATE KEY).")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user