@@ -0,0 +1,400 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
import time
from datetime import date , timedelta
from pathlib import Path
import json
import requests
import mysql . connector
from mysql . connector import Error
from typing import Dict , Any , List
import hashlib
# ====================================================================
# A. Vynucení UTF-8 pro správnou diakritiku v plánovaných úlohách
# ====================================================================
sys . stdout = io . TextIOWrapper ( sys . stdout . buffer , encoding = ' utf-8 ' , errors = ' replace ' )
sys . stderr = io . TextIOWrapper ( sys . stderr . buffer , encoding = ' utf-8 ' , errors = ' replace ' )
"""
FIO MULTI– ACCOUNT IMPORTER — VERZE S ROBUSTNĚJŠÍM HANDLINGEM PK
===============================================================
- mysql.connector (Oracle) pro stabilní manipulaci s datovými typy
- Bezpečné generování id_operace, pokud chybí Column22
- Správné mapování id_pokynu = Column19
- Detailní logování chybných řádků
"""
# =========================================
# CONFIGURATION
# =========================================
ACCOUNTS_FILE = r " accounts.json "
JSON_BASE_DIR = r " z: \ Dropbox \ !!!Days \ Downloads Z230 \ Fio "
DB = {
" host " : " 192.168.1.76 " ,
" port " : 3307 ,
" user " : " root " ,
" password " : " Vlado9674+ " ,
" database " : " fio " ,
" charset " : " utf8mb4 " ,
}
BATCH_SIZE = 500
DAYS_BACK = 90
# Zapíná detailnější logování při chybách insertu
DEBUG_ON_ERROR = True
# =========================================
# HELPERS
# =========================================
def load_accounts ( path : str ) - > List [ Dict [ str , str ] ] :
""" Reads accounts.json and validates content. """
with open ( path , " r " , encoding = " utf-8 " ) as f :
accounts = json . load ( f )
for acc in accounts :
for key in ( " name " , " account_number " , " token " ) :
if key not in acc :
raise ValueError ( f " Missing ' { key } ' in account config: { acc } " )
return accounts
def fio_url_for_period ( token : str , d_from : date , d_to : date ) - > str :
""" Constructs the URL for Fio REST API periods endpoint. """
from_str = d_from . strftime ( " % Y- % m- %d " )
to_str = d_to . strftime ( " % Y- % m- %d " )
return f " https://fioapi.fio.cz/v1/rest/periods/ { token } / { from_str } / { to_str } /transactions.json "
def fetch_fio_json ( token : str , d_from : date , d_to : date ) - > Any :
""" Calls Fio API and fetches JSON. """
url = fio_url_for_period ( token , d_from , d_to )
resp = requests . get ( url , timeout = 30 )
if resp . status_code != 200 :
print ( f " ❌ HTTP { resp . status_code } from Fio: { url } " , flush = True )
return None
try :
return resp . json ( )
except json . JSONDecodeError :
print ( " ❌ Cannot decode JSON from Fio response " , flush = True )
return None
def safe_col ( t : dict , n : int ) - > Any :
""" SAFE ACCESSOR for Fio transaction column numbers (ColumnN). """
key = f " column { n } "
val = t . get ( key )
if not val :
return None
return val . get ( " value " )
def clean_date ( dt_str : str ) - > str :
""" Strips timezone from Fio date string ( " YYYY-MM-DD+HH:MM " ) → " YYYY-MM-DD " . """
if not dt_str :
return None
return str ( dt_str ) [ : 10 ]
def ensure_dir ( path : Path ) :
""" Creates directory if it doesn’ t exist. """
path . mkdir ( parents = True , exist_ok = True )
def save_json_for_account ( base_dir : str , account_cfg : dict , data : dict , d_from : date , d_to : date ) - > Path :
""" Saves raw JSON to disk. """
acc_num_raw = account_cfg [ " account_number " ]
acc_folder_name = acc_num_raw . replace ( " / " , " _ " )
out_dir = Path ( base_dir ) / acc_folder_name
ensure_dir ( out_dir )
filename = f " { d_from . strftime ( ' % Y- % m- %d ' ) } _to_ { d_to . strftime ( ' % Y- % m- %d ' ) } .json "
out_path = out_dir / filename
with open ( out_path , " w " , encoding = " utf-8 " ) as f :
json . dump ( data , f , ensure_ascii = False , indent = 2 )
return out_path
def generate_fallback_id ( fio_acc_id : str , t : dict ) - > str :
"""
Vygeneruje deterministický fallback ID, pokud chybí Column22 (id pohybu).
Použije SHA1 hash z několika sloupců a ořízne na 20 znaků, aby se vešlo
do VARCHAR(20) primárního klíče.
"""
raw_date = clean_date ( safe_col ( t , 0 ) ) or " "
amount = str ( safe_col ( t , 1 ) or " " )
protiucet = str ( safe_col ( t , 2 ) or " " )
vs = str ( safe_col ( t , 5 ) or " " )
source = f " { fio_acc_id } | { raw_date } | { amount } | { protiucet } | { vs } "
digest = hashlib . sha1 ( source . encode ( " utf-8 " ) ) . hexdigest ( )
return digest [ : 20 ]
# =========================================
# MAIN IMPORT LOGIC
# =========================================
def main ( ) :
start_all = time . time ( )
today = date . today ( )
d_from = today - timedelta ( days = DAYS_BACK )
d_to = today
print ( " === Fio multi-account import v3 (PK fix, lepší logování) === " , flush = True )
print ( f " Období: { d_from } až { d_to } " , flush = True )
# Load all accounts from accounts.json
try :
accounts = load_accounts ( ACCOUNTS_FILE )
except Exception as e :
print ( f " FATÁLNÍ CHYBA při načítání účtů: { e } " , flush = True )
return
print ( f " Účtů v konfiguraci: { len ( accounts ) } \n " , flush = True )
# Připojení k DB
try :
conn = mysql . connector . connect (
host = DB [ " host " ] ,
port = DB [ " port " ] ,
user = DB [ " user " ] ,
password = DB [ " password " ] ,
database = DB [ " database " ] ,
charset = DB [ " charset " ] ,
)
cur = conn . cursor ( )
except Error as e :
print ( f " FATÁLNÍ CHYBA při připojení k DB: { e } " , flush = True )
return
# SQL INSERT dotaz přizpůsobený nové DB struktuře
sql = """
INSERT INTO transactions
(
id_operace, cislo_uctu, transaction_date, amount, currency,
protiucet, kod_banky, nazev_protiuctu, nazev_banky, typ,
vs, ks, ss, uziv_identifikace, zprava_pro_prijemce,
provedl, id_pokynu, komentar, upr_objem_mena, api_bic, reference_platce
)
VALUES
(
%(id_operace)s , %(cislo_uctu)s , %(transaction_date)s , %(amount)s , %(currency)s ,
%(protiucet)s , %(kod_banky)s , %(nazev_protiuctu)s , %(nazev_banky)s , %(typ)s ,
%(vs)s , %(ks)s , %(ss)s , %(uziv_identifikace)s , %(zprava_pro_prijemce)s ,
%(provedl)s , %(id_pokynu)s , %(komentar)s , %(upr_objem_mena)s , %(api_bic)s , %(reference_platce)s
)
ON DUPLICATE KEY UPDATE
cislo_uctu = VALUES(cislo_uctu),
transaction_date = VALUES(transaction_date),
amount = VALUES(amount),
currency = VALUES(currency),
protiucet = VALUES(protiucet),
kod_banky = VALUES(kod_banky),
nazev_protiuctu = VALUES(nazev_protiuctu),
nazev_banky = VALUES(nazev_banky),
typ = VALUES(typ),
vs = VALUES(vs),
ks = VALUES(ks),
ss = VALUES(ss),
uziv_identifikace = VALUES(uziv_identifikace),
zprava_pro_prijemce = VALUES(zprava_pro_prijemce),
provedl = VALUES(provedl),
id_pokynu = VALUES(id_pokynu),
komentar = VALUES(komentar),
upr_objem_mena = VALUES(upr_objem_mena),
api_bic = VALUES(api_bic),
reference_platce = VALUES(reference_platce)
"""
total_inserted = 0
total_skipped_pk = 0
total_skipped_error = 0
# ======================================================
# PROCESS EACH ACCOUNT IN accounts.json
# ======================================================
for acc in accounts :
name = acc [ " name " ]
cfg_acc_num = acc [ " account_number " ]
token = acc [ " token " ]
print ( f " --- Účet: { name } ( { cfg_acc_num } ) --- " , flush = True )
t0 = time . time ( )
# 1) Download JSON from Fio API
data = fetch_fio_json ( token , d_from , d_to )
if data is None :
print ( " Přeskakuji, žádná data / chyba API. \n " , flush = True )
continue
# 2) Save raw JSON file to disk
try :
json_path = save_json_for_account ( JSON_BASE_DIR , acc , data , d_from , d_to )
print ( f " JSON uložen do: { json_path } " , flush = True )
except Exception as e :
print ( f " ❌ Chyba při ukládání JSON souboru: { e } " , flush = True )
# 3) Extract transactions from JSON tree
tlist = data . get ( " accountStatement " , { } ) . get ( " transactionList " , { } ) . get ( " transaction " , [ ] )
if isinstance ( tlist , dict ) :
tlist = [ tlist ]
print ( f " Počet transakcí v období: { len ( tlist ) } " , flush = True )
if not tlist :
print ( " Žádné transakce, jdu dál. \n " , flush = True )
continue
fio_acc_id = data . get ( " accountStatement " , { } ) . get ( " info " , { } ) . get ( " accountId " )
if cfg_acc_num and fio_acc_id and cfg_acc_num . split ( " / " ) [ 0 ] not in fio_acc_id :
print (
f " ⚠ Upozornění: accountId z Fio ( { fio_acc_id } ) "
f " se neshoduje s account_number v konfiguraci ( { cfg_acc_num } ) " ,
flush = True ,
)
# 4) Build list of MySQL rows
rows = [ ]
skipped_pk_account = 0
for t in tlist :
# id_operace = Column22 (tvé "id pohybu")
id_operace_val = safe_col ( t , 22 )
# Pokud chybí, vygenerujeme si stabilní fallback (hash) – vejde se do VARCHAR(20)
if id_operace_val is None :
fallback = generate_fallback_id ( fio_acc_id or " " , t )
id_operace_val = fallback
# Můžeš odkomentovat, pokud chceš vidět, kde se používá fallback
# print(f" ⚠ Fallback id_operace (hash) pro transakci: {fallback}", flush=True)
# Bez PK nemá smysl zápis – jen pro jistotu, fallback by měl vše pokrýt
if id_operace_val is None :
skipped_pk_account + = 1
continue
transaction_date = clean_date ( safe_col ( t , 0 ) )
if not transaction_date :
# Bez data by insert stejně spadl (NOT NULL), tak to raději přeskočíme
if DEBUG_ON_ERROR :
print ( f " ⚠ Přeskakuji transakci bez data, id_operace= { id_operace_val } " , flush = True )
skipped_pk_account + = 1
continue
id_pokynu_val = safe_col ( t , 19 ) # tvé "id pokynu" = Column19
row = {
" id_operace " : str ( id_operace_val ) ,
" cislo_uctu " : fio_acc_id ,
" transaction_date " : transaction_date ,
" amount " : safe_col ( t , 1 ) ,
" currency " : safe_col ( t , 14 ) ,
" typ " : safe_col ( t , 8 ) ,
" provedl " : safe_col ( t , 9 ) ,
" protiucet " : safe_col ( t , 2 ) ,
" kod_banky " : safe_col ( t , 3 ) ,
" nazev_protiuctu " : safe_col ( t , 10 ) ,
" nazev_banky " : safe_col ( t , 12 ) ,
" api_bic " : safe_col ( t , 26 ) ,
" vs " : safe_col ( t , 5 ) ,
" ks " : safe_col ( t , 4 ) ,
" ss " : safe_col ( t , 6 ) ,
" zprava_pro_prijemce " : safe_col ( t , 16 ) ,
" uziv_identifikace " : safe_col ( t , 7 ) ,
" komentar " : safe_col ( t , 25 ) ,
" upr_objem_mena " : safe_col ( t , 18 ) ,
" id_pokynu " : str ( id_pokynu_val ) if id_pokynu_val is not None else None ,
" reference_platce " : safe_col ( t , 27 ) ,
}
rows . append ( row )
if skipped_pk_account :
print ( f " ⚠ Přeskočeno { skipped_pk_account } transakcí kvůli chybějícímu/invalidnímu PK nebo datu. " , flush = True )
total_skipped_pk + = skipped_pk_account
# 5) INSERT rows into MySQL in batches
inserted = 0
skipped_error_account = 0
for i in range ( 0 , len ( rows ) , BATCH_SIZE ) :
chunk = rows [ i : i + BATCH_SIZE ]
try :
cur . executemany ( sql , chunk )
conn . commit ( )
inserted + = len ( chunk )
except Error as e :
print ( f " ❌ Chyba při zápisu batch do DB: { e } " , flush = True )
conn . rollback ( )
if DEBUG_ON_ERROR :
print ( " ► Přecházím na per-row insert pro detail chyb... " , flush = True )
for row in chunk :
try :
cur . execute ( sql , row )
conn . commit ( )
inserted + = 1
except Error as e_row :
skipped_error_account + = 1
conn . rollback ( )
print (
f " ✗ Chybná transakce id_operace= { row . get ( ' id_operace ' ) } "
f " datum= { row . get ( ' transaction_date ' ) } částka= { row . get ( ' amount ' ) } "
f " → { e_row } " ,
flush = True ,
)
elapsed = time . time ( ) - t0
total_inserted + = inserted
total_skipped_error + = skipped_error_account
print (
f " ✓ Zapsáno (insert/update): { inserted } řádků do DB "
f " (přeskočeno chybějící PK/dat { skipped_pk_account } , chybou insertu { skipped_error_account } ) "
f " za { elapsed : .2f } s \n " ,
flush = True ,
)
# Close DB
cur . close ( )
conn . close ( )
total_elapsed = time . time ( ) - start_all
print (
f " === Hotovo. Celkem zapsáno { total_inserted } transakcí. "
f " Přeskočeno kvůli PK/datům: { total_skipped_pk } , kvůli chybě insertu: { total_skipped_error } . "
f " Celkový čas: { total_elapsed : .2f } s === " ,
flush = True ,
)
# ======================================================
# ENTRY POINT
# ======================================================
if __name__ == " __main__ " :
main ( )