Add FIO scripts and remove .idea from tracking

- Add FIO bank scripts (fio 01-03, diagnostika, multi-account reporter)
- Remove .idea/ IDE config files from git (already in .gitignore)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-25 20:13:54 +01:00
parent 89511c500f
commit 11e71e3f2d
7 changed files with 707 additions and 17 deletions
Generated
-10
View File
@@ -1,10 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.13 (FIO)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>
-7
View File
@@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.12 (FIO)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.13 (FIO)" project-jdk-type="Python SDK" />
</project>
@@ -0,0 +1,351 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
# Force UTF-8 output for Scheduled Tasks
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
FIO MULTIACCOUNT IMPORTER — FULLY COMMENTED VERSION
====================================================
This script downloads transactions for **multiple Fio bank accounts**
(using their API tokens) and imports them into a MySQL database
(`fio.transactions` table).
It also saves the raw JSON responses into a folder structure
for backup / debugging / later use.
Main features:
• Reads all accounts from accounts.json
• Downloads last N days (default 90)
• Saves JSON files to disk
• Extracts all transactions with safe parsing
• Inserts into MySQL with ON DUPLICATE KEY UPDATE
• Efficient batch insertion (executemany)
"""
import os
import json
import time
from datetime import date, timedelta
from pathlib import Path
import requests # used to call Fio REST API
import pymysql # MySQL driver
# =========================================
# CONFIGURATION
# =========================================
# JSON file containing multiple account configs:
# [
# { "name": "CZK rodina", "account_number": "2100046291", "token": "xxx" },
# ...
# ]
ACCOUNTS_FILE = r"../accounts.json"
# Directory where raw JSON files from Fio API will be stored.
JSON_BASE_DIR = r"u:\Dropbox\!!!Days\Downloads Z230\Fio"
# MySQL connection parameters
DB = {
"host": "192.168.1.76",
"port": 3306,
"user": "root",
"password": "Vlado9674+",
"database": "fio",
"charset": "utf8mb4",
}
# How many transactions insert per batch (performance tuning)
BATCH_SIZE = 500
# How many days back we load from Fio (default = last 90 days)
DAYS_BACK = 90
# =========================================
# HELPERS
# =========================================
def load_accounts(path: str):
"""
Reads accounts.json and does simple validation to ensure
each entry contains: name, account_number, token.
"""
with open(path, "r", encoding="utf-8") as f:
accounts = json.load(f)
for acc in accounts:
for key in ("name", "account_number", "token"):
if key not in acc:
raise ValueError(f"Missing '{key}' in account config: {acc}")
return accounts
def fio_url_for_period(token: str, d_from: date, d_to: date) -> str:
"""
Constructs the exact URL for Fio REST API "periods" endpoint.
Example:
https://fioapi.fio.cz/v1/rest/periods/<token>/2025-01-01/2025-01-31/transactions.json
"""
from_str = d_from.strftime("%Y-%m-%d")
to_str = d_to.strftime("%Y-%m-%d")
return f"https://fioapi.fio.cz/v1/rest/periods/{token}/{from_str}/{to_str}/transactions.json"
def fetch_fio_json(token: str, d_from: date, d_to: date):
"""
Calls Fio API and fetches JSON.
Handles HTTP errors and JSON decoding errors.
"""
url = fio_url_for_period(token, d_from, d_to)
resp = requests.get(url, timeout=30)
if resp.status_code != 200:
print(f" ❌ HTTP {resp.status_code} from Fio: {url}")
return None
try:
return resp.json()
except json.JSONDecodeError:
print(" ❌ Cannot decode JSON from Fio response")
return None
def safe_col(t: dict, n: int):
"""
SAFE ACCESSOR for Fio transaction column numbers.
Fio JSON schema example:
"column5": { "name": "VS", "value": "123456" }
But the structure is NOT guaranteed to exist.
So this function prevents KeyError or NoneType errors.
Returns:
t["columnN"]["value"] or None
"""
key = f"column{n}"
val = t.get(key)
if not val:
return None
return val.get("value")
def clean_date(dt_str: str):
"""
Fio returns dates like: "2025-02-14+0100"
We strip timezone → "2025-02-14"
"""
if not dt_str:
return None
return dt_str[:10]
def ensure_dir(path: Path):
"""Creates directory if it doesnt exist."""
path.mkdir(parents=True, exist_ok=True)
def save_json_for_account(base_dir: str, account_cfg: dict, data: dict, d_from: date, d_to: date):
"""
Saves raw JSON to:
<base_dir>/<account_number>/YYYY-MM-DD_to_YYYY-MM-DD.json
Useful for debugging, backups, or re-imports.
"""
acc_num_raw = account_cfg["account_number"]
acc_folder_name = acc_num_raw.replace("/", "_") # sanitize dir name for filesystem
out_dir = Path(base_dir) / acc_folder_name
ensure_dir(out_dir)
filename = f"{d_from.strftime('%Y-%m-%d')}_to_{d_to.strftime('%Y-%m-%d')}.json"
out_path = out_dir / filename
with open(out_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return out_path
# =========================================
# MAIN IMPORT LOGIC
# =========================================
def main():
start_all = time.time()
# Calculate time range (last N days)
today = date.today()
d_from = today - timedelta(days=DAYS_BACK)
d_to = today
print("=== Fio multi-account import ===")
print(f"Období: {d_from}{d_to}")
print("Načítám účty z JSON konfigurace...")
# Load all accounts from accounts.json
accounts = load_accounts(ACCOUNTS_FILE)
print(f" Účtů v konfiguraci: {len(accounts)}\n")
# Connect to database
conn = pymysql.connect(**DB)
cur = conn.cursor()
# SQL INSERT with ON DUPLICATE KEY UPDATE
# This means: if transaction already exists (same unique key), update it.
sql = """
INSERT INTO transactions
(
datum, objem, mena, cislo_uctu, protiucet, kod_banky,
ks, vs, ss, zprava_pro_prijemce, poznamka,
id_operace, id_pokynu,
nazev_banky, nazev_protiuctu,
typ, upresneni_objem, upresneni_mena, zadal
)
VALUES
(
%(datum)s, %(objem)s, %(mena)s, %(cislo_uctu)s, %(protiucet)s, %(kod_banky)s,
%(ks)s, %(vs)s, %(ss)s, %(zprava)s, %(poznamka)s,
%(id_operace)s, %(id_pokynu)s,
%(nazev_banky)s, %(nazev_protiuctu)s,
%(typ)s, %(upr_objem)s, %(upr_mena)s, %(zadal)s
)
ON DUPLICATE KEY UPDATE
datum = VALUES(datum),
objem = VALUES(objem),
mena = VALUES(mena),
protiucet = VALUES(protiucet),
kod_banky = VALUES(kod_banky),
ks = VALUES(ks),
vs = VALUES(vs),
ss = VALUES(ss),
zprava_pro_prijemce = VALUES(zprava_pro_prijemce),
poznamka = VALUES(poznamka),
nazev_banky = VALUES(nazev_banky),
nazev_protiuctu = VALUES(nazev_protiuctu),
typ = VALUES(typ),
upresneni_objem = VALUES(upresneni_objem),
upresneni_mena = VALUES(upresneni_mena),
zadal = VALUES(zadal),
id_operace = VALUES(id_operace),
id_pokynu = VALUES(id_pokynu)
"""
total_inserted = 0
# ======================================================
# PROCESS EACH ACCOUNT IN accounts.json
# ======================================================
for acc in accounts:
name = acc["name"]
cfg_acc_num = acc["account_number"]
token = acc["token"]
print(f"--- Účet: {name} ({cfg_acc_num}) ---")
t0 = time.time()
# --- 1) Download JSON from Fio API
data = fetch_fio_json(token, d_from, d_to)
if data is None:
print(" Přeskakuji, žádná data / chyba API.\n")
continue
# --- 2) Save raw JSON file to disk
json_path = save_json_for_account(JSON_BASE_DIR, acc, data, d_from, d_to)
print(f" JSON uložen do: {json_path}")
# --- 3) Extract transactions from JSON tree
tlist = data["accountStatement"]["transactionList"].get("transaction", [])
# FIO can return single transaction as an object (not list)
if isinstance(tlist, dict):
tlist = [tlist]
print(f" Počet transakcí v období: {len(tlist)}")
if not tlist:
print(" Žádné transakce, jdu dál.\n")
continue
# FIO returns account ID under accountStatement.info.accountId
fio_acc_id = data["accountStatement"]["info"]["accountId"]
# Warn if account ID in JSON doesn't match config (informational only)
if cfg_acc_num and cfg_acc_num.split("/")[0] not in fio_acc_id:
print(f" ⚠ Upozornění: accountId z Fio ({fio_acc_id}) "
f"se neshoduje s account_number v konfiguraci ({cfg_acc_num})")
# --- 4) Build list of MySQL rows
rows = []
for t in tlist:
row = {
"datum": clean_date(safe_col(t, 0)),
"objem": safe_col(t, 1),
"mena": safe_col(t, 14),
"cislo_uctu": fio_acc_id,
"protiucet": safe_col(t, 2),
"kod_banky": safe_col(t, 3),
"ks": safe_col(t, 4),
"vs": safe_col(t, 5),
"ss": safe_col(t, 6),
"zprava": safe_col(t, 16),
"poznamka": safe_col(t, 25),
"id_operace": safe_col(t, 22), # ID pohybu
"id_pokynu": safe_col(t, 19), # ID pokynu
"nazev_banky": safe_col(t, 15),
"nazev_protiuctu": safe_col(t, 10),
"typ": safe_col(t, 8),
"upr_objem": safe_col(t, 20),
"upr_mena": safe_col(t, 21),
"zadal": safe_col(t, 12),
}
rows.append(row)
# --- 5) INSERT rows into MySQL in batches
inserted = 0
for i in range(0, len(rows), BATCH_SIZE):
chunk = rows[i : i + BATCH_SIZE]
cur.executemany(sql, chunk) # fast multi-row insert/update
conn.commit()
inserted += len(chunk)
elapsed = time.time() - t0
total_inserted += inserted
print(f" ✓ Zapsáno (insert/update): {inserted} řádků do DB za {elapsed:.2f} s\n")
# Close DB
cur.close()
conn.close()
total_elapsed = time.time() - start_all
print(f"=== Hotovo. Celkem zapsáno {total_inserted} transakcí. "
f"Celkový čas: {total_elapsed:.2f} s ===")
# ======================================================
# ENTRY POINT
# ======================================================
if __name__ == "__main__":
main()
+24
View File
@@ -0,0 +1,24 @@
import pymysql
from pymysql.cursors import DictCursor
conn = pymysql.connect(
host="192.168.1.76",
port=3307,
user="root",
password="Vlado9674+",
database="fio",
charset="utf8mb4",
cursorclass=DictCursor
)
with conn.cursor() as cur:
cur.execute("SHOW TABLES;")
print("📋 Tables:", [r[f"Tables_in_fio"] for r in cur.fetchall()])
cur.execute("SELECT COUNT(*) AS cnt FROM transactions;")
print("🧾 Rows in `transactions`:", cur.fetchone()["cnt"])
cur.execute("SHOW COLUMNS FROM transactions;")
print("\n📊 Columns:")
for r in cur.fetchall():
print(" -", r["Field"])
+117
View File
@@ -0,0 +1,117 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Export Fio transactions (from MySQL → Excel)
--------------------------------------------
- Reads only cislo_uctu = '2800046620'
- For OZP (protiucet=2070101041) includes only positive objem
- Each sheet = insurance company (filtered by protiucet)
- First sheet = summary with total amounts and transaction counts
"""
import pandas as pd
import pymysql
from pathlib import Path
from datetime import datetime
# ======== CONFIG ========
MYSQL_CONFIG = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "fio",
"charset": "utf8mb4",
}
REPORTOVAT = {
"VZP": "1114007221",
"VOZP": "2010009091",
"ČPZP": "2054108761",
"OZP": "2070101041",
"ZPŠ": "2090309181",
"ZPMV": "2112108031",
}
EXPORT_PATH = Path(r"u:\Dropbox\!!!Days\Downloads Z230") / f"Fio_report_{datetime.now():%Y-%m-%d_%H-%M-%S}.xlsx"
# ======== LOAD DATA ========
def load_data():
print("🔄 Načítám data z MySQL (účet 2800046620, pro OZP jen kladné objemy)...")
conn = pymysql.connect(**MYSQL_CONFIG)
sql = """
SELECT *
FROM transactions
WHERE cislo_uctu = '2800046620'
AND (
protiucet <> '2070101041'
OR (protiucet = '2070101041' AND objem > 0)
);
"""
df = pd.read_sql(sql, conn)
conn.close()
df.columns = df.columns.str.strip()
print(f"✅ Načteno {len(df)} řádků, {len(df.columns)} sloupců.")
return df
# ======== EXPORT TO EXCEL ========
def export_to_excel(df):
summary_rows = [] # to collect summary per insurer
with pd.ExcelWriter(EXPORT_PATH, engine="openpyxl") as writer:
# --- INDIVIDUAL SHEETS ---
for name, acc in REPORTOVAT.items():
filtered = df[df["protiucet"].astype(str) == acc]
if filtered.empty:
print(f"⚠️ {name}: žádné transakce (účet {acc})")
summary_rows.append({
"Pojišťovna": name,
"Číslo účtu": acc,
"Počet transakcí": 0,
"Součet objemu": 0.0
})
continue
# safe numeric conversion
filtered = filtered.copy()
filtered["objem_num"] = (
filtered["objem"]
.astype(str)
.str.replace("\u00A0", "", regex=False)
.str.replace(",", ".", regex=False)
.astype(float)
)
# --- summary data ---
total_sum = filtered["objem_num"].sum()
total_count = len(filtered)
summary_rows.append({
"Pojišťovna": name,
"Číslo účtu": acc,
"Počet transakcí": total_count,
"Součet objemu": round(total_sum, 2)
})
# --- write detailed sheet ---
filtered.to_excel(writer, index=False, sheet_name=name)
print(f"{name}: {len(filtered)} řádků exportováno, součet {total_sum:,.2f}")
# --- SUMMARY SHEET ---
summary_df = pd.DataFrame(summary_rows)
summary_df["Součet objemu"] = summary_df["Součet objemu"].map("{:,.2f}".format)
summary_df.to_excel(writer, index=False, sheet_name="Přehled")
print("🧾 Přidán přehledový list s celkovými součty.")
print(f"\n📊 Hotovo! Soubor uložen jako:\n{EXPORT_PATH}")
# ======== MAIN ========
if __name__ == "__main__":
df = load_data()
export_to_excel(df)
+168
View File
@@ -0,0 +1,168 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Fio CSV import → MySQL (dev version)
------------------------------------
- Always drops & recreates `transactions` table
- Uses real CSV headers as seen in "Vyhledane pohyby (3).csv"
- Unique key = (Číslo účtu, ID operace, ID pokynu)
"""
import csv
from pathlib import Path
from datetime import datetime
import pymysql
from pymysql.cursors import DictCursor
import re
# ======== CONFIG ========
CSV_PATH = Path(r"u:\Dropbox\!!!Days\Downloads Z230\Vyhledane pohyby (3).csv")
TABLE_NAME = "transactions"
MYSQL_CONFIG = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "fio",
"charset": "utf8mb4",
"cursorclass": DictCursor,
"autocommit": True,
}
# ======== HELPERS ========
def clean(s: str):
if not s:
return None
return s.strip() or None
def parse_date(raw: str):
raw = (raw or "").strip()
if not raw:
return None
try:
return datetime.strptime(raw, "%d.%m.%Y").date()
except ValueError:
return None
def parse_float(raw: str):
if raw is None:
return None
s = str(raw).strip()
for ch in (" ", "\u00A0", "\u202F", "\u2007"):
s = s.replace(ch, "")
s = s.replace(",", ".")
s = re.sub(r"[^0-9.+-]", "", s)
try:
return float(s)
except ValueError:
return None
# ======== DB ========
def get_mysql_connection():
return pymysql.connect(**MYSQL_CONFIG)
def recreate_table(conn):
"""Drop and recreate table with schema matching CSV structure."""
sql = f"""
DROP TABLE IF EXISTS `{TABLE_NAME}`;
CREATE TABLE `{TABLE_NAME}` (
id INT AUTO_INCREMENT PRIMARY KEY,
datum DATE,
objem DECIMAL(14,2),
mena CHAR(3),
cislo_uctu VARCHAR(40),
protiucet VARCHAR(40),
kod_banky VARCHAR(20),
ks VARCHAR(20),
vs VARCHAR(20),
ss VARCHAR(20),
zprava_pro_prijemce VARCHAR(500),
poznamka VARCHAR(500),
id_operace VARCHAR(50),
id_pokynu VARCHAR(50),
ks_1 VARCHAR(20),
nazev_banky VARCHAR(100),
nazev_protiuctu VARCHAR(200),
ss_1 VARCHAR(20),
typ VARCHAR(100),
upresneni_objem VARCHAR(100),
upresneni_mena VARCHAR(20),
vs_1 VARCHAR(20),
zadal VARCHAR(200),
imported_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uniq_tx (cislo_uctu, id_operace, id_pokynu)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
with conn.cursor() as cur:
for stmt in sql.strip().split(";"):
if stmt.strip():
cur.execute(stmt)
print(f"✅ Tabulka `{TABLE_NAME}` znovu vytvořena podle CSV struktury.")
# ======== IMPORT ========
def import_fio_csv():
with open(CSV_PATH, "r", encoding="utf-8-sig", newline="") as f:
reader = csv.DictReader(f, delimiter=";", quotechar='"')
rows = list(reader)
total = len(rows)
print(f"📄 Načteno {total} řádků ze souboru {CSV_PATH.name}")
with get_mysql_connection() as conn:
recreate_table(conn)
inserted, skipped = 0, 0
for i, row in enumerate(rows, start=1):
data = {
"datum": parse_date(row.get("Datum")),
"objem": parse_float(row.get("Objem")),
"mena": clean(row.get("Měna")),
"cislo_uctu": clean(row.get("Číslo účtu")),
"protiucet": clean(row.get("Protiúčet")),
"kod_banky": clean(row.get("Kód banky")),
"ks": clean(row.get("KS")),
"vs": clean(row.get("VS")),
"ss": clean(row.get("SS")),
"zprava_pro_prijemce": clean(row.get("Zpráva pro příjemce")),
"poznamka": clean(row.get("Poznámka")),
"id_operace": clean(row.get("ID operace")),
"id_pokynu": clean(row.get("ID pokynu")),
"ks_1": clean(row.get("KS.1")),
"nazev_banky": clean(row.get("Název banky")),
"nazev_protiuctu": clean(row.get("Název protiúčtu")),
"ss_1": clean(row.get("SS.1")),
"typ": clean(row.get("Typ")),
"upresneni_objem": clean(row.get("Upřesnění - objem")),
"upresneni_mena": clean(row.get("Upřesnění - měna")),
"vs_1": clean(row.get("VS.1")),
"zadal": clean(row.get("Zadal")),
}
cols = ", ".join(data.keys())
placeholders = ", ".join(["%s"] * len(data))
sql = f"INSERT IGNORE INTO `{TABLE_NAME}` ({cols}) VALUES ({placeholders})"
with conn.cursor() as cur:
affected = cur.execute(sql, list(data.values()))
if affected:
inserted += 1
else:
skipped += 1
if i % 500 == 0 or i == total:
print(f" {i}/{total} zpracováno... ({inserted} vloženo, {skipped} duplicit)")
print(f"\n✅ Import dokončen: {inserted} nových, {skipped} duplicit přeskočeno.")
# ======== MAIN ========
if __name__ == "__main__":
import_fio_csv()
+47
View File
@@ -0,0 +1,47 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Quick, verified dump of all Fio transactions from MySQL → Excel.
Column names are exactly as in DB.
"""
import pandas as pd
import pymysql
from pymysql.cursors import DictCursor
from pathlib import Path
from datetime import datetime
# ======== CONFIG ========
MYSQL_CONFIG = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "fio",
"charset": "utf8mb4",
}
EXPORT_PATH = Path(r"u:\Dropbox\!!!Days\Downloads Z230") / f"Fio_ALL_{datetime.now():%Y-%m-%d_%H-%M-%S}.xlsx"
# ======== MAIN ========
def dump_all_transactions():
with pymysql.connect(**MYSQL_CONFIG) as conn:
sql = """
SELECT
*
FROM transactions
ORDER BY datum DESC;
"""
df = pd.read_sql(sql, conn)
print(f"✅ Načteno {len(df)} transakcí z MySQL.")
# Save to Excel
df.to_excel(EXPORT_PATH, index=False)
print(f"📊 Excel export hotov:\n{EXPORT_PATH}")
if __name__ == "__main__":
dump_all_transactions()