#!/usr/bin/env python3 # -*- coding: utf-8 -*- import sys import io import time from datetime import date, timedelta from pathlib import Path import json import requests import mysql.connector from mysql.connector import Error from typing import Dict, Any, List # ==================================================================== # A. PONECHÁNO: Vynucení UTF-8 pro správnou diakritiku v plánovaných úlohách # ==================================================================== sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') """ FIO MULTI–ACCOUNT IMPORTER — KONEČNÁ VERZE S AUDITEM ============================================================ Přidána zpětná kontrola (audit) po každé dávce, která ověřuje, že se ID transakcí skutečně zapsaly do databáze. """ # ========================================= # CONFIGURATION # ========================================= # JSON file containing multiple account configs: # [ #    { "name": "CZK rodina", "account_number": "2100046291", "token": "xxx" }, #    ... # ] ACCOUNTS_FILE = r"/accounts.json" # Directory where raw JSON files from Fio API will be stored. JSON_BASE_DIR = r"z:\Dropbox\!!!Days\Downloads Z230\Fio" # MySQL connection parameters DB = { "host": "192.168.1.76", "port": 3307, "user": "root", "password": "Vlado9674+", "database": "fio", "charset": "utf8mb4", } # How many transactions insert per batch (performance tuning) BATCH_SIZE = 500 # How many days back we load from Fio (default = last 90 days) DAYS_BACK = 90 # ========================================= # HELPERS (Beze změny) # ========================================= def load_accounts(path: str) -> List[Dict[str, str]]: """Reads accounts.json and validates content.""" with open(path, "r", encoding="utf-8") as f: accounts = json.load(f) for acc in accounts: for key in ("name", "account_number", "token"): if key not in acc: raise ValueError(f"Missing '{key}' in account config: {acc}") return accounts def fio_url_for_period(token: str, d_from: date, d_to: date) -> str: """Constructs the URL for Fio REST API periods endpoint.""" from_str = d_from.strftime("%Y-%m-%d") to_str = d_to.strftime("%Y-%m-%d") return f"https://fioapi.fio.cz/v1/rest/periods/{token}/{from_str}/{to_str}/transactions.json" def fetch_fio_json(token: str, d_from: date, d_to: date) -> Any: """Calls Fio API and fetches JSON.""" url = fio_url_for_period(token, d_from, d_to) resp = requests.get(url, timeout=30) if resp.status_code != 200: print(f"  ❌ HTTP {resp.status_code} from Fio: {url}", flush=True) return None try: return resp.json() except json.JSONDecodeError: print("  ❌ Cannot decode JSON from Fio response", flush=True) return None def safe_col(t: dict, n: int) -> Any: """SAFE ACCESSOR for Fio transaction column numbers (ColumnN).""" key = f"column{n}" val = t.get(key) if not val: return None return val.get("value") def clean_date(dt_str: str) -> str: """Strips timezone from Fio date string ("YYYY-MM-DD+HH:MM") → "YYYY-MM-DD".""" if not dt_str: return None return str(dt_str)[:10] def ensure_dir(path: Path): """Creates directory if it doesn’t exist.""" path.mkdir(parents=True, exist_ok=True) def save_json_for_account(base_dir: str, account_cfg: dict, data: dict, d_from: date, d_to: date) -> Path: """Saves raw JSON to disk.""" acc_num_raw = account_cfg["account_number"] acc_folder_name = acc_num_raw.replace("/", "_") out_dir = Path(base_dir) / acc_folder_name ensure_dir(out_dir) filename = f"{d_from.strftime('%Y-%m-%d')}_to_{d_to.strftime('%Y-%m-%d')}.json" out_path = out_dir / filename with open(out_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) return out_path # ========================================= # MAIN IMPORT LOGIC # ========================================= def main(): start_all = time.time() # Calculate time range (last N days) today = date.today() d_from = today - timedelta(days=DAYS_BACK) d_to = today print("=== Fio multi-account import v2 (NOVÝ KONEKTOR + AUDIT) ===", flush=True) print(f"Období: {d_from} až {d_to}", flush=True) # Load all accounts from accounts.json try: accounts = load_accounts(ACCOUNTS_FILE) except Exception as e: print(f"FATÁLNÍ CHYBA při načítání účtů: {e}", flush=True) return print(f"  Účtů v konfiguraci: {len(accounts)}\n", flush=True) # ZMĚNA: Připojení pomocí mysql.connector try: conn = mysql.connector.connect( host=DB["host"], port=DB["port"], user=DB["user"], password=DB["password"], database=DB["database"], charset=DB["charset"] ) cur = conn.cursor() except Error as e: print(f"FATÁLNÍ CHYBA při připojení k DB: {e}", flush=True) return # SQL INSERT dotaz přizpůsobený nové DB struktuře sql = """ INSERT INTO transactions (id_operace, cislo_uctu, transaction_date, amount, currency, \ protiucet, kod_banky, nazev_protiuctu, nazev_banky, typ, \ vs, ks, ss, uziv_identifikace, zprava_pro_prijemce, \ provedl, id_pokynu, komentar, upr_objem_mena, api_bic, reference_platce) VALUES (%(id_operace)s, %(cislo_uctu)s, %(transaction_date)s, %(amount)s, %(currency)s, %(protiucet)s, %(kod_banky)s, %(nazev_protiuctu)s, %(nazev_banky)s, %(typ)s, %(vs)s, %(ks)s, %(ss)s, %(uziv_identifikace)s, %(zprava_pro_prijemce)s, %(provedl)s, %(id_pokynu)s, %(komentar)s, %(upr_objem_mena)s, %(api_bic)s, \ %(reference_platce)s) ON DUPLICATE KEY \ UPDATE \ cislo_uctu = \ VALUES (cislo_uctu), transaction_date = \ VALUES (transaction_date), amount = \ VALUES (amount), currency = \ VALUES (currency), protiucet = \ VALUES (protiucet), kod_banky = \ VALUES (kod_banky), nazev_protiuctu = \ VALUES (nazev_protiuctu), nazev_banky = \ VALUES (nazev_banky), typ = \ VALUES (typ), vs = \ VALUES (vs), ks = \ VALUES (ks), ss = \ VALUES (ss), uziv_identifikace = \ VALUES (uziv_identifikace), zprava_pro_prijemce = \ VALUES (zprava_pro_prijemce), provedl = \ VALUES (provedl), id_pokynu = \ VALUES (id_pokynu), komentar = \ VALUES (komentar), upr_objem_mena = \ VALUES (upr_objem_mena), api_bic = \ VALUES (api_bic), reference_platce = \ VALUES (reference_platce) \ """ total_inserted = 0 # ====================================================== # PROCESS EACH ACCOUNT IN accounts.json # ====================================================== for acc in accounts: name = acc["name"] cfg_acc_num = acc["account_number"] token = acc["token"] print(f"--- Účet: {name} ({cfg_acc_num}) ---", flush=True) t0 = time.time() # --- 1) Download JSON from Fio API data = fetch_fio_json(token, d_from, d_to) if data is None: print("  Přeskakuji, žádná data / chyba API.\n", flush=True) continue # --- 2) Save raw JSON file to disk try: json_path = save_json_for_account(JSON_BASE_DIR, acc, data, d_from, d_to) print(f"  JSON uložen do: {json_path}", flush=True) except Exception as e: print(f"  ❌ Chyba při ukládání JSON souboru: {e}", flush=True) pass # --- 3) Extract transactions from JSON tree tlist = data.get("accountStatement", {}).get("transactionList", {}).get("transaction", []) if isinstance(tlist, dict): tlist = [tlist] print(f"  Počet transakcí v období: {len(tlist)}", flush=True) if not tlist: print("  Žádné transakce, jdu dál.\n", flush=True) continue fio_acc_id = data.get("accountStatement", {}).get("info", {}).get("accountId") if cfg_acc_num and fio_acc_id and cfg_acc_num.split("/")[0] not in fio_acc_id: print(f"  ⚠ Upozornění: accountId z Fio ({fio_acc_id}) " f"se neshoduje s account_number v konfiguraci ({cfg_acc_num})", flush=True) # --- 4) Build list of MySQL rows (MAPOVÁNÍ S KONVERZÍ NA STR) rows = [] for t in tlist: id_operace_val = safe_col(t, 22) id_pokynu_val = safe_col(t, 17) row = { "id_operace": str(id_operace_val) if id_operace_val is not None else None, "cislo_uctu": fio_acc_id, "transaction_date": clean_date(safe_col(t, 0)), "amount": safe_col(t, 1), "currency": safe_col(t, 14), "typ": safe_col(t, 8), "provedl": safe_col(t, 9), "protiucet": safe_col(t, 2), "kod_banky": safe_col(t, 3), "nazev_protiuctu": safe_col(t, 10), "nazev_banky": safe_col(t, 12), "api_bic": safe_col(t, 26), "vs": safe_col(t, 5), "ks": safe_col(t, 4), "ss": safe_col(t, 6), "zprava_pro_prijemce": safe_col(t, 16), "uziv_identifikace": safe_col(t, 7), "komentar": safe_col(t, 25), "upr_objem_mena": safe_col(t, 18), "id_pokynu": str(id_pokynu_val) if id_pokynu_val is not None else None, "reference_platce": safe_col(t, 27), } rows.append(row) # --- 5) INSERT rows into MySQL in batches S AUDITEM inserted = 0 actual_inserted_count = 0 # Počet řádků potvrzených auditem for i in range(0, len(rows), BATCH_SIZE): chunk = rows[i: i + BATCH_SIZE] # Získej ID transakcí z aktuální dávky pro audit chunk_ids = [row["id_operace"] for row in chunk if row["id_operace"] is not None] try: # Krok 5.1: Provedení zápisu cur.executemany(sql, chunk) conn.commit() # --- Krok 5.2: AUDIT: Zpětná kontrola --- if chunk_ids: # Vytvoření stringu ID pro SQL dotaz: ('id1', 'id2', ...) id_string = ', '.join([f"'{i}'" for i in chunk_ids]) audit_query = f"SELECT COUNT(*) FROM transactions WHERE id_operace IN ({id_string})" cur.execute(audit_query) # ZMĚNA: Používáme fetchone()[0] pro mysql.connector found_count = cur.fetchone()[0] if found_count != len(chunk): print( f"  ⚠ AUDIT SELHAL: Zapsáno {len(chunk)}, ale v databázi nalezeno jen {found_count} pro tuto dávku!", flush=True) # Pokud audit selhal, tiskneme, která ID chybí (pro debug) # To je složitější, ale alespoň víme, kolik jich chybí. actual_inserted_count += found_count else: # Pokud dávka neobsahuje ID (což by neměla, ale pro jistotu) actual_inserted_count += len(chunk) inserted += len(chunk) except Error as e: print(f"  ❌ Chyba při zápisu do DB: {e}", flush=True) conn.rollback() break # Přerušíme cyklus, pokud nastane kritická chyba elapsed = time.time() - t0 total_inserted += actual_inserted_count # Sčítáme počet potvrzený auditem print(f"  ✓ Zapsáno (potvrzeno auditem): {actual_inserted_count} řádků do DB za {elapsed:.2f} s\n", flush=True) # Nyní by měl tento počet odpovídat počtu transakcí pro daný účet, pokud nebyly žádné duplicity. # Close DB cur.close() conn.close() total_elapsed = time.time() - start_all print(f"=== Hotovo. Celkem zapsáno (potvrzeno auditem) {total_inserted} transakcí. " f"Celkový čas: {total_elapsed:.2f} s ===", flush=True) # ====================================================== # ENTRY POINT # ====================================================== if __name__ == "__main__": main()