Files
fio/#10 Download reports/Trash/10 Read s auditem.py
2025-12-07 07:11:12 +01:00

354 lines
13 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
import time
from datetime import date, timedelta
from pathlib import Path
import json
import requests
import mysql.connector
from mysql.connector import Error
from typing import Dict, Any, List
# ====================================================================
# A. PONECHÁNO: Vynucení UTF-8 pro správnou diakritiku v plánovaných úlohách
# ====================================================================
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
FIO MULTIACCOUNT IMPORTER — KONEČNÁ VERZE S AUDITEM
============================================================
Přidána zpětná kontrola (audit) po každé dávce, která ověřuje,
že se ID transakcí skutečně zapsaly do databáze.
"""
# =========================================
# CONFIGURATION
# =========================================
# JSON file containing multiple account configs:
# [
#    { "name": "CZK rodina", "account_number": "2100046291", "token": "xxx" },
#    ...
# ]
ACCOUNTS_FILE = r"/accounts.json"
# Directory where raw JSON files from Fio API will be stored.
JSON_BASE_DIR = r"z:\Dropbox\!!!Days\Downloads Z230\Fio"
# MySQL connection parameters
DB = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "fio",
"charset": "utf8mb4",
}
# How many transactions insert per batch (performance tuning)
BATCH_SIZE = 500
# How many days back we load from Fio (default = last 90 days)
DAYS_BACK = 90
# =========================================
# HELPERS (Beze změny)
# =========================================
def load_accounts(path: str) -> List[Dict[str, str]]:
"""Reads accounts.json and validates content."""
with open(path, "r", encoding="utf-8") as f:
accounts = json.load(f)
for acc in accounts:
for key in ("name", "account_number", "token"):
if key not in acc:
raise ValueError(f"Missing '{key}' in account config: {acc}")
return accounts
def fio_url_for_period(token: str, d_from: date, d_to: date) -> str:
"""Constructs the URL for Fio REST API periods endpoint."""
from_str = d_from.strftime("%Y-%m-%d")
to_str = d_to.strftime("%Y-%m-%d")
return f"https://fioapi.fio.cz/v1/rest/periods/{token}/{from_str}/{to_str}/transactions.json"
def fetch_fio_json(token: str, d_from: date, d_to: date) -> Any:
"""Calls Fio API and fetches JSON."""
url = fio_url_for_period(token, d_from, d_to)
resp = requests.get(url, timeout=30)
if resp.status_code != 200:
print(f"  ❌ HTTP {resp.status_code} from Fio: {url}", flush=True)
return None
try:
return resp.json()
except json.JSONDecodeError:
print("  ❌ Cannot decode JSON from Fio response", flush=True)
return None
def safe_col(t: dict, n: int) -> Any:
"""SAFE ACCESSOR for Fio transaction column numbers (ColumnN)."""
key = f"column{n}"
val = t.get(key)
if not val:
return None
return val.get("value")
def clean_date(dt_str: str) -> str:
"""Strips timezone from Fio date string ("YYYY-MM-DD+HH:MM") → "YYYY-MM-DD"."""
if not dt_str:
return None
return str(dt_str)[:10]
def ensure_dir(path: Path):
"""Creates directory if it doesnt exist."""
path.mkdir(parents=True, exist_ok=True)
def save_json_for_account(base_dir: str, account_cfg: dict, data: dict, d_from: date, d_to: date) -> Path:
"""Saves raw JSON to disk."""
acc_num_raw = account_cfg["account_number"]
acc_folder_name = acc_num_raw.replace("/", "_")
out_dir = Path(base_dir) / acc_folder_name
ensure_dir(out_dir)
filename = f"{d_from.strftime('%Y-%m-%d')}_to_{d_to.strftime('%Y-%m-%d')}.json"
out_path = out_dir / filename
with open(out_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return out_path
# =========================================
# MAIN IMPORT LOGIC
# =========================================
def main():
start_all = time.time()
# Calculate time range (last N days)
today = date.today()
d_from = today - timedelta(days=DAYS_BACK)
d_to = today
print("=== Fio multi-account import v2 (NOVÝ KONEKTOR + AUDIT) ===", flush=True)
print(f"Období: {d_from}{d_to}", flush=True)
# Load all accounts from accounts.json
try:
accounts = load_accounts(ACCOUNTS_FILE)
except Exception as e:
print(f"FATÁLNÍ CHYBA při načítání účtů: {e}", flush=True)
return
print(f"  Účtů v konfiguraci: {len(accounts)}\n", flush=True)
# ZMĚNA: Připojení pomocí mysql.connector
try:
conn = mysql.connector.connect(
host=DB["host"],
port=DB["port"],
user=DB["user"],
password=DB["password"],
database=DB["database"],
charset=DB["charset"]
)
cur = conn.cursor()
except Error as e:
print(f"FATÁLNÍ CHYBA při připojení k DB: {e}", flush=True)
return
# SQL INSERT dotaz přizpůsobený nové DB struktuře
sql = """
INSERT INTO transactions
(id_operace, cislo_uctu, transaction_date, amount, currency, \
protiucet, kod_banky, nazev_protiuctu, nazev_banky, typ, \
vs, ks, ss, uziv_identifikace, zprava_pro_prijemce, \
provedl, id_pokynu, komentar, upr_objem_mena, api_bic, reference_platce)
VALUES (%(id_operace)s, %(cislo_uctu)s, %(transaction_date)s, %(amount)s, %(currency)s,
%(protiucet)s, %(kod_banky)s, %(nazev_protiuctu)s, %(nazev_banky)s, %(typ)s,
%(vs)s, %(ks)s, %(ss)s, %(uziv_identifikace)s, %(zprava_pro_prijemce)s,
%(provedl)s, %(id_pokynu)s, %(komentar)s, %(upr_objem_mena)s, %(api_bic)s, \
%(reference_platce)s) ON DUPLICATE KEY \
UPDATE \
cislo_uctu = \
VALUES (cislo_uctu), transaction_date = \
VALUES (transaction_date), amount = \
VALUES (amount), currency = \
VALUES (currency), protiucet = \
VALUES (protiucet), kod_banky = \
VALUES (kod_banky), nazev_protiuctu = \
VALUES (nazev_protiuctu), nazev_banky = \
VALUES (nazev_banky), typ = \
VALUES (typ), vs = \
VALUES (vs), ks = \
VALUES (ks), ss = \
VALUES (ss), uziv_identifikace = \
VALUES (uziv_identifikace), zprava_pro_prijemce = \
VALUES (zprava_pro_prijemce), provedl = \
VALUES (provedl), id_pokynu = \
VALUES (id_pokynu), komentar = \
VALUES (komentar), upr_objem_mena = \
VALUES (upr_objem_mena), api_bic = \
VALUES (api_bic), reference_platce = \
VALUES (reference_platce) \
"""
total_inserted = 0
# ======================================================
# PROCESS EACH ACCOUNT IN accounts.json
# ======================================================
for acc in accounts:
name = acc["name"]
cfg_acc_num = acc["account_number"]
token = acc["token"]
print(f"--- Účet: {name} ({cfg_acc_num}) ---", flush=True)
t0 = time.time()
# --- 1) Download JSON from Fio API
data = fetch_fio_json(token, d_from, d_to)
if data is None:
print("  Přeskakuji, žádná data / chyba API.\n", flush=True)
continue
# --- 2) Save raw JSON file to disk
try:
json_path = save_json_for_account(JSON_BASE_DIR, acc, data, d_from, d_to)
print(f"  JSON uložen do: {json_path}", flush=True)
except Exception as e:
print(f"  ❌ Chyba při ukládání JSON souboru: {e}", flush=True)
pass
# --- 3) Extract transactions from JSON tree
tlist = data.get("accountStatement", {}).get("transactionList", {}).get("transaction", [])
if isinstance(tlist, dict):
tlist = [tlist]
print(f"  Počet transakcí v období: {len(tlist)}", flush=True)
if not tlist:
print("  Žádné transakce, jdu dál.\n", flush=True)
continue
fio_acc_id = data.get("accountStatement", {}).get("info", {}).get("accountId")
if cfg_acc_num and fio_acc_id and cfg_acc_num.split("/")[0] not in fio_acc_id:
print(f"  ⚠ Upozornění: accountId z Fio ({fio_acc_id}) "
f"se neshoduje s account_number v konfiguraci ({cfg_acc_num})", flush=True)
# --- 4) Build list of MySQL rows (MAPOVÁNÍ S KONVERZÍ NA STR)
rows = []
for t in tlist:
id_operace_val = safe_col(t, 22)
id_pokynu_val = safe_col(t, 17)
row = {
"id_operace": str(id_operace_val) if id_operace_val is not None else None,
"cislo_uctu": fio_acc_id,
"transaction_date": clean_date(safe_col(t, 0)),
"amount": safe_col(t, 1),
"currency": safe_col(t, 14),
"typ": safe_col(t, 8),
"provedl": safe_col(t, 9),
"protiucet": safe_col(t, 2),
"kod_banky": safe_col(t, 3),
"nazev_protiuctu": safe_col(t, 10),
"nazev_banky": safe_col(t, 12),
"api_bic": safe_col(t, 26),
"vs": safe_col(t, 5),
"ks": safe_col(t, 4),
"ss": safe_col(t, 6),
"zprava_pro_prijemce": safe_col(t, 16),
"uziv_identifikace": safe_col(t, 7),
"komentar": safe_col(t, 25),
"upr_objem_mena": safe_col(t, 18),
"id_pokynu": str(id_pokynu_val) if id_pokynu_val is not None else None,
"reference_platce": safe_col(t, 27),
}
rows.append(row)
# --- 5) INSERT rows into MySQL in batches S AUDITEM
inserted = 0
actual_inserted_count = 0 # Počet řádků potvrzených auditem
for i in range(0, len(rows), BATCH_SIZE):
chunk = rows[i: i + BATCH_SIZE]
# Získej ID transakcí z aktuální dávky pro audit
chunk_ids = [row["id_operace"] for row in chunk if row["id_operace"] is not None]
try:
# Krok 5.1: Provedení zápisu
cur.executemany(sql, chunk)
conn.commit()
# --- Krok 5.2: AUDIT: Zpětná kontrola ---
if chunk_ids:
# Vytvoření stringu ID pro SQL dotaz: ('id1', 'id2', ...)
id_string = ', '.join([f"'{i}'" for i in chunk_ids])
audit_query = f"SELECT COUNT(*) FROM transactions WHERE id_operace IN ({id_string})"
cur.execute(audit_query)
# ZMĚNA: Používáme fetchone()[0] pro mysql.connector
found_count = cur.fetchone()[0]
if found_count != len(chunk):
print(
f"  ⚠ AUDIT SELHAL: Zapsáno {len(chunk)}, ale v databázi nalezeno jen {found_count} pro tuto dávku!",
flush=True)
# Pokud audit selhal, tiskneme, která ID chybí (pro debug)
# To je složitější, ale alespoň víme, kolik jich chybí.
actual_inserted_count += found_count
else:
# Pokud dávka neobsahuje ID (což by neměla, ale pro jistotu)
actual_inserted_count += len(chunk)
inserted += len(chunk)
except Error as e:
print(f"  ❌ Chyba při zápisu do DB: {e}", flush=True)
conn.rollback()
break # Přerušíme cyklus, pokud nastane kritická chyba
elapsed = time.time() - t0
total_inserted += actual_inserted_count # Sčítáme počet potvrzený auditem
print(f"  ✓ Zapsáno (potvrzeno auditem): {actual_inserted_count} řádků do DB za {elapsed:.2f} s\n", flush=True)
# Nyní by měl tento počet odpovídat počtu transakcí pro daný účet, pokud nebyly žádné duplicity.
# Close DB
cur.close()
conn.close()
total_elapsed = time.time() - start_all
print(f"=== Hotovo. Celkem zapsáno (potvrzeno auditem) {total_inserted} transakcí. "
f"Celkový čas: {total_elapsed:.2f} s ===", flush=True)
# ======================================================
# ENTRY POINT
# ======================================================
if __name__ == "__main__":
main()