354 lines
10 KiB
Python
354 lines
10 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import sys
|
||
import io
|
||
|
||
# Force UTF-8 output for Scheduled Tasks
|
||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
|
||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
|
||
|
||
|
||
"""
|
||
FIO MULTI–ACCOUNT IMPORTER — FULLY COMMENTED VERSION
|
||
====================================================
|
||
|
||
This script downloads transactions for **multiple Fio bank accounts**
|
||
(using their API tokens) and imports them into a MySQL database
|
||
(`fio.transactions` table).
|
||
|
||
It also saves the raw JSON responses into a folder structure
|
||
for backup / debugging / later use.
|
||
|
||
Main features:
|
||
• Reads all accounts from accounts.json
|
||
• Downloads last N days (default 90)
|
||
• Saves JSON files to disk
|
||
• Extracts all transactions with safe parsing
|
||
• Inserts into MySQL with ON DUPLICATE KEY UPDATE
|
||
• Efficient batch insertion (executemany)
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import time
|
||
from datetime import date, timedelta
|
||
from pathlib import Path
|
||
|
||
import requests # used to call Fio REST API
|
||
import pymysql # MySQL driver
|
||
|
||
|
||
# =========================================
|
||
# CONFIGURATION
|
||
# =========================================
|
||
|
||
# JSON file containing multiple account configs:
|
||
# [
|
||
# { "name": "CZK rodina", "account_number": "2100046291", "token": "xxx" },
|
||
# ...
|
||
# ]
|
||
ACCOUNTS_FILE = r"c:\users\vlado\PycharmProjects\FIO\accounts.json"
|
||
|
||
# Directory where raw JSON files from Fio API will be stored.
|
||
JSON_BASE_DIR = r"z:\Dropbox\!!!Days\Downloads Z230\Fio"
|
||
|
||
# MySQL connection parameters
|
||
DB = {
|
||
"host": "192.168.1.76",
|
||
"port": 3307,
|
||
"user": "root",
|
||
"password": "Vlado9674+",
|
||
"database": "fio",
|
||
"charset": "utf8mb4",
|
||
}
|
||
|
||
# How many transactions insert per batch (performance tuning)
|
||
BATCH_SIZE = 500
|
||
|
||
# How many days back we load from Fio (default = last 90 days)
|
||
DAYS_BACK = 90
|
||
|
||
|
||
# =========================================
|
||
# HELPERS
|
||
# =========================================
|
||
|
||
def load_accounts(path: str):
|
||
"""
|
||
Reads accounts.json and does simple validation to ensure
|
||
each entry contains: name, account_number, token.
|
||
"""
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
accounts = json.load(f)
|
||
|
||
for acc in accounts:
|
||
for key in ("name", "account_number", "token"):
|
||
if key not in acc:
|
||
raise ValueError(f"Missing '{key}' in account config: {acc}")
|
||
|
||
return accounts
|
||
|
||
|
||
def fio_url_for_period(token: str, d_from: date, d_to: date) -> str:
|
||
"""
|
||
Constructs the exact URL for Fio REST API "periods" endpoint.
|
||
Example:
|
||
https://fioapi.fio.cz/v1/rest/periods/<token>/2025-01-01/2025-01-31/transactions.json
|
||
"""
|
||
from_str = d_from.strftime("%Y-%m-%d")
|
||
to_str = d_to.strftime("%Y-%m-%d")
|
||
return f"https://fioapi.fio.cz/v1/rest/periods/{token}/{from_str}/{to_str}/transactions.json"
|
||
|
||
|
||
def fetch_fio_json(token: str, d_from: date, d_to: date):
|
||
"""
|
||
Calls Fio API and fetches JSON.
|
||
Handles HTTP errors and JSON decoding errors.
|
||
"""
|
||
url = fio_url_for_period(token, d_from, d_to)
|
||
resp = requests.get(url, timeout=30)
|
||
|
||
if resp.status_code != 200:
|
||
print(f" ❌ HTTP {resp.status_code} from Fio: {url}")
|
||
return None
|
||
|
||
try:
|
||
return resp.json()
|
||
except json.JSONDecodeError:
|
||
print(" ❌ Cannot decode JSON from Fio response")
|
||
return None
|
||
|
||
|
||
def safe_col(t: dict, n: int):
|
||
"""
|
||
SAFE ACCESSOR for Fio transaction column numbers.
|
||
|
||
Fio JSON schema example:
|
||
"column5": { "name": "VS", "value": "123456" }
|
||
|
||
But the structure is NOT guaranteed to exist.
|
||
So this function prevents KeyError or NoneType errors.
|
||
|
||
Returns:
|
||
t["columnN"]["value"] or None
|
||
"""
|
||
key = f"column{n}"
|
||
val = t.get(key)
|
||
if not val:
|
||
return None
|
||
return val.get("value")
|
||
|
||
|
||
def clean_date(dt_str: str):
|
||
"""
|
||
Fio returns dates like: "2025-02-14+0100"
|
||
We strip timezone → "2025-02-14"
|
||
"""
|
||
if not dt_str:
|
||
return None
|
||
return dt_str[:10]
|
||
|
||
|
||
def ensure_dir(path: Path):
|
||
"""Creates directory if it doesn’t exist."""
|
||
path.mkdir(parents=True, exist_ok=True)
|
||
|
||
|
||
def save_json_for_account(base_dir: str, account_cfg: dict, data: dict, d_from: date, d_to: date):
|
||
"""
|
||
Saves raw JSON to:
|
||
<base_dir>/<account_number>/YYYY-MM-DD_to_YYYY-MM-DD.json
|
||
|
||
Useful for debugging, backups, or re-imports.
|
||
"""
|
||
acc_num_raw = account_cfg["account_number"]
|
||
acc_folder_name = acc_num_raw.replace("/", "_") # sanitize dir name for filesystem
|
||
|
||
out_dir = Path(base_dir) / acc_folder_name
|
||
ensure_dir(out_dir)
|
||
|
||
filename = f"{d_from.strftime('%Y-%m-%d')}_to_{d_to.strftime('%Y-%m-%d')}.json"
|
||
out_path = out_dir / filename
|
||
|
||
with open(out_path, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
return out_path
|
||
|
||
|
||
# =========================================
|
||
# MAIN IMPORT LOGIC
|
||
# =========================================
|
||
|
||
def main():
|
||
start_all = time.time()
|
||
|
||
# Calculate time range (last N days)
|
||
today = date.today()
|
||
d_from = today - timedelta(days=DAYS_BACK)
|
||
d_to = today
|
||
|
||
print("=== Fio multi-account import ===")
|
||
print(f"Období: {d_from} až {d_to}")
|
||
print("Načítám účty z JSON konfigurace...")
|
||
|
||
# Load all accounts from accounts.json
|
||
accounts = load_accounts(ACCOUNTS_FILE)
|
||
print(f" Účtů v konfiguraci: {len(accounts)}\n")
|
||
|
||
# Connect to database
|
||
conn = pymysql.connect(**DB)
|
||
cur = conn.cursor()
|
||
|
||
# SQL INSERT with ON DUPLICATE KEY UPDATE
|
||
# This means: if transaction already exists (same unique key), update it.
|
||
sql = """
|
||
INSERT INTO transactions
|
||
(
|
||
datum, objem, mena, cislo_uctu, protiucet, kod_banky,
|
||
ks, vs, ss, zprava_pro_prijemce, poznamka,
|
||
id_operace, id_pokynu, ks_1, nazev_banky, nazev_protiuctu,
|
||
ss_1, typ, upresneni_objem, upresneni_mena, vs_1, zadal
|
||
)
|
||
VALUES
|
||
(
|
||
%(datum)s, %(objem)s, %(mena)s, %(cislo_uctu)s, %(protiucet)s, %(kod_banky)s,
|
||
%(ks)s, %(vs)s, %(ss)s, %(zprava)s, %(poznamka)s,
|
||
%(id_operace)s, %(id_pokynu)s, %(ks1)s, %(nazev_banky)s, %(nazev_protiuctu)s,
|
||
%(ss1)s, %(typ)s, %(upr_objem)s, %(upr_mena)s, %(vs1)s, %(zadal)s
|
||
)
|
||
ON DUPLICATE KEY UPDATE
|
||
datum = VALUES(datum),
|
||
objem = VALUES(objem),
|
||
mena = VALUES(mena),
|
||
protiucet = VALUES(protiucet),
|
||
kod_banky = VALUES(kod_banky),
|
||
ks = VALUES(ks),
|
||
vs = VALUES(vs),
|
||
ss = VALUES(ss),
|
||
zprava_pro_prijemce = VALUES(zprava_pro_prijemce),
|
||
poznamka = VALUES(poznamka),
|
||
ks_1 = VALUES(ks_1),
|
||
nazev_banky = VALUES(nazev_banky),
|
||
nazev_protiuctu = VALUES(nazev_protiuctu),
|
||
ss_1 = VALUES(ss_1),
|
||
typ = VALUES(typ),
|
||
upresneni_objem = VALUES(upresneni_objem),
|
||
upresneni_mena = VALUES(upresneni_mena),
|
||
vs_1 = VALUES(vs_1),
|
||
zadal = VALUES(zadal)
|
||
"""
|
||
|
||
total_inserted = 0
|
||
|
||
# ======================================================
|
||
# PROCESS EACH ACCOUNT IN accounts.json
|
||
# ======================================================
|
||
for acc in accounts:
|
||
name = acc["name"]
|
||
cfg_acc_num = acc["account_number"]
|
||
token = acc["token"]
|
||
|
||
print(f"--- Účet: {name} ({cfg_acc_num}) ---")
|
||
t0 = time.time()
|
||
|
||
# --- 1) Download JSON from Fio API
|
||
data = fetch_fio_json(token, d_from, d_to)
|
||
if data is None:
|
||
print(" Přeskakuji, žádná data / chyba API.\n")
|
||
continue
|
||
|
||
# --- 2) Save raw JSON file to disk
|
||
json_path = save_json_for_account(JSON_BASE_DIR, acc, data, d_from, d_to)
|
||
print(f" JSON uložen do: {json_path}")
|
||
|
||
# --- 3) Extract transactions from JSON tree
|
||
tlist = data["accountStatement"]["transactionList"].get("transaction", [])
|
||
|
||
# FIO can return single transaction as an object (not list)
|
||
if isinstance(tlist, dict):
|
||
tlist = [tlist]
|
||
|
||
print(f" Počet transakcí v období: {len(tlist)}")
|
||
|
||
if not tlist:
|
||
print(" Žádné transakce, jdu dál.\n")
|
||
continue
|
||
|
||
# FIO returns account ID under accountStatement.info.accountId
|
||
fio_acc_id = data["accountStatement"]["info"]["accountId"]
|
||
|
||
# Warn if account ID in JSON doesn't match config (informational only)
|
||
if cfg_acc_num and cfg_acc_num.split("/")[0] not in fio_acc_id:
|
||
print(f" ⚠ Upozornění: accountId z Fio ({fio_acc_id}) "
|
||
f"se neshoduje s account_number v konfiguraci ({cfg_acc_num})")
|
||
|
||
# --- 4) Build list of MySQL rows
|
||
rows = []
|
||
for t in tlist:
|
||
row = {
|
||
"datum": clean_date(safe_col(t, 0)),
|
||
"objem": safe_col(t, 1),
|
||
"mena": safe_col(t, 14),
|
||
|
||
"cislo_uctu": fio_acc_id,
|
||
"protiucet": safe_col(t, 2),
|
||
"kod_banky": safe_col(t, 3),
|
||
|
||
"ks": safe_col(t, 4),
|
||
"vs": safe_col(t, 5),
|
||
"ss": safe_col(t, 6),
|
||
|
||
"zprava": safe_col(t, 16),
|
||
"poznamka": safe_col(t, 25),
|
||
|
||
"id_operace": safe_col(t, 22),
|
||
"id_pokynu": safe_col(t, 24),
|
||
|
||
"ks1": safe_col(t, 18),
|
||
"nazev_banky": safe_col(t, 15),
|
||
"nazev_protiuctu": safe_col(t, 10),
|
||
|
||
"ss1": safe_col(t, 19),
|
||
"typ": safe_col(t, 8),
|
||
|
||
"upr_objem": safe_col(t, 20),
|
||
"upr_mena": safe_col(t, 21),
|
||
"vs1": safe_col(t, 17),
|
||
|
||
"zadal": safe_col(t, 12),
|
||
}
|
||
rows.append(row)
|
||
|
||
# --- 5) INSERT rows into MySQL in batches
|
||
inserted = 0
|
||
|
||
for i in range(0, len(rows), BATCH_SIZE):
|
||
chunk = rows[i : i + BATCH_SIZE]
|
||
cur.executemany(sql, chunk) # fast multi-row insert/update
|
||
conn.commit()
|
||
inserted += len(chunk)
|
||
|
||
elapsed = time.time() - t0
|
||
total_inserted += inserted
|
||
|
||
print(f" ✓ Zapsáno (insert/update): {inserted} řádků do DB za {elapsed:.2f} s\n")
|
||
|
||
# Close DB
|
||
cur.close()
|
||
conn.close()
|
||
|
||
total_elapsed = time.time() - start_all
|
||
|
||
print(f"=== Hotovo. Celkem zapsáno {total_inserted} transakcí. "
|
||
f"Celkový čas: {total_elapsed:.2f} s ===")
|
||
|
||
|
||
# ======================================================
|
||
# ENTRY POINT
|
||
# ======================================================
|
||
|
||
if __name__ == "__main__":
|
||
main()
|