This commit is contained in:
2025-12-07 07:11:12 +01:00
parent 49ede2b452
commit 4c57c92332
9 changed files with 2087 additions and 0 deletions

View File

@@ -0,0 +1,400 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
import time
from datetime import date, timedelta
from pathlib import Path
import json
import requests
import mysql.connector
from mysql.connector import Error
from typing import Dict, Any, List
import hashlib
# ====================================================================
# A. Vynucení UTF-8 pro správnou diakritiku v plánovaných úlohách
# ====================================================================
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
FIO MULTIACCOUNT IMPORTER — VERZE S ROBUSTNĚJŠÍM HANDLINGEM PK
===============================================================
- mysql.connector (Oracle) pro stabilní manipulaci s datovými typy
- Bezpečné generování id_operace, pokud chybí Column22
- Správné mapování id_pokynu = Column19
- Detailní logování chybných řádků
"""
# =========================================
# CONFIGURATION
# =========================================
ACCOUNTS_FILE = r"c:\users\vlado\PycharmProjects\FIO\accounts.json"
JSON_BASE_DIR = r"z:\Dropbox\!!!Days\Downloads Z230\Fio"
DB = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "fio",
"charset": "utf8mb4",
}
BATCH_SIZE = 500
DAYS_BACK = 90
# Zapíná detailnější logování při chybách insertu
DEBUG_ON_ERROR = True
# =========================================
# HELPERS
# =========================================
def load_accounts(path: str) -> List[Dict[str, str]]:
"""Reads accounts.json and validates content."""
with open(path, "r", encoding="utf-8") as f:
accounts = json.load(f)
for acc in accounts:
for key in ("name", "account_number", "token"):
if key not in acc:
raise ValueError(f"Missing '{key}' in account config: {acc}")
return accounts
def fio_url_for_period(token: str, d_from: date, d_to: date) -> str:
"""Constructs the URL for Fio REST API periods endpoint."""
from_str = d_from.strftime("%Y-%m-%d")
to_str = d_to.strftime("%Y-%m-%d")
return f"https://fioapi.fio.cz/v1/rest/periods/{token}/{from_str}/{to_str}/transactions.json"
def fetch_fio_json(token: str, d_from: date, d_to: date) -> Any:
"""Calls Fio API and fetches JSON."""
url = fio_url_for_period(token, d_from, d_to)
resp = requests.get(url, timeout=30)
if resp.status_code != 200:
print(f" ❌ HTTP {resp.status_code} from Fio: {url}", flush=True)
return None
try:
return resp.json()
except json.JSONDecodeError:
print(" ❌ Cannot decode JSON from Fio response", flush=True)
return None
def safe_col(t: dict, n: int) -> Any:
"""SAFE ACCESSOR for Fio transaction column numbers (ColumnN)."""
key = f"column{n}"
val = t.get(key)
if not val:
return None
return val.get("value")
def clean_date(dt_str: str) -> str:
"""Strips timezone from Fio date string ("YYYY-MM-DD+HH:MM") → "YYYY-MM-DD"."""
if not dt_str:
return None
return str(dt_str)[:10]
def ensure_dir(path: Path):
"""Creates directory if it doesnt exist."""
path.mkdir(parents=True, exist_ok=True)
def save_json_for_account(base_dir: str, account_cfg: dict, data: dict, d_from: date, d_to: date) -> Path:
"""Saves raw JSON to disk."""
acc_num_raw = account_cfg["account_number"]
acc_folder_name = acc_num_raw.replace("/", "_")
out_dir = Path(base_dir) / acc_folder_name
ensure_dir(out_dir)
filename = f"{d_from.strftime('%Y-%m-%d')}_to_{d_to.strftime('%Y-%m-%d')}.json"
out_path = out_dir / filename
with open(out_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return out_path
def generate_fallback_id(fio_acc_id: str, t: dict) -> str:
"""
Vygeneruje deterministický fallback ID, pokud chybí Column22 (id pohybu).
Použije SHA1 hash z několika sloupců a ořízne na 20 znaků, aby se vešlo
do VARCHAR(20) primárního klíče.
"""
raw_date = clean_date(safe_col(t, 0)) or ""
amount = str(safe_col(t, 1) or "")
protiucet = str(safe_col(t, 2) or "")
vs = str(safe_col(t, 5) or "")
source = f"{fio_acc_id}|{raw_date}|{amount}|{protiucet}|{vs}"
digest = hashlib.sha1(source.encode("utf-8")).hexdigest()
return digest[:20]
# =========================================
# MAIN IMPORT LOGIC
# =========================================
def main():
start_all = time.time()
today = date.today()
d_from = today - timedelta(days=DAYS_BACK)
d_to = today
print("=== Fio multi-account import v3 (PK fix, lepší logování) ===", flush=True)
print(f"Období: {d_from}{d_to}", flush=True)
# Load all accounts from accounts.json
try:
accounts = load_accounts(ACCOUNTS_FILE)
except Exception as e:
print(f"FATÁLNÍ CHYBA při načítání účtů: {e}", flush=True)
return
print(f" Účtů v konfiguraci: {len(accounts)}\n", flush=True)
# Připojení k DB
try:
conn = mysql.connector.connect(
host=DB["host"],
port=DB["port"],
user=DB["user"],
password=DB["password"],
database=DB["database"],
charset=DB["charset"],
)
cur = conn.cursor()
except Error as e:
print(f"FATÁLNÍ CHYBA při připojení k DB: {e}", flush=True)
return
# SQL INSERT dotaz přizpůsobený nové DB struktuře
sql = """
INSERT INTO transactions
(
id_operace, cislo_uctu, transaction_date, amount, currency,
protiucet, kod_banky, nazev_protiuctu, nazev_banky, typ,
vs, ks, ss, uziv_identifikace, zprava_pro_prijemce,
provedl, id_pokynu, komentar, upr_objem_mena, api_bic, reference_platce
)
VALUES
(
%(id_operace)s, %(cislo_uctu)s, %(transaction_date)s, %(amount)s, %(currency)s,
%(protiucet)s, %(kod_banky)s, %(nazev_protiuctu)s, %(nazev_banky)s, %(typ)s,
%(vs)s, %(ks)s, %(ss)s, %(uziv_identifikace)s, %(zprava_pro_prijemce)s,
%(provedl)s, %(id_pokynu)s, %(komentar)s, %(upr_objem_mena)s, %(api_bic)s, %(reference_platce)s
)
ON DUPLICATE KEY UPDATE
cislo_uctu = VALUES(cislo_uctu),
transaction_date = VALUES(transaction_date),
amount = VALUES(amount),
currency = VALUES(currency),
protiucet = VALUES(protiucet),
kod_banky = VALUES(kod_banky),
nazev_protiuctu = VALUES(nazev_protiuctu),
nazev_banky = VALUES(nazev_banky),
typ = VALUES(typ),
vs = VALUES(vs),
ks = VALUES(ks),
ss = VALUES(ss),
uziv_identifikace = VALUES(uziv_identifikace),
zprava_pro_prijemce = VALUES(zprava_pro_prijemce),
provedl = VALUES(provedl),
id_pokynu = VALUES(id_pokynu),
komentar = VALUES(komentar),
upr_objem_mena = VALUES(upr_objem_mena),
api_bic = VALUES(api_bic),
reference_platce = VALUES(reference_platce)
"""
total_inserted = 0
total_skipped_pk = 0
total_skipped_error = 0
# ======================================================
# PROCESS EACH ACCOUNT IN accounts.json
# ======================================================
for acc in accounts:
name = acc["name"]
cfg_acc_num = acc["account_number"]
token = acc["token"]
print(f"--- Účet: {name} ({cfg_acc_num}) ---", flush=True)
t0 = time.time()
# 1) Download JSON from Fio API
data = fetch_fio_json(token, d_from, d_to)
if data is None:
print(" Přeskakuji, žádná data / chyba API.\n", flush=True)
continue
# 2) Save raw JSON file to disk
try:
json_path = save_json_for_account(JSON_BASE_DIR, acc, data, d_from, d_to)
print(f" JSON uložen do: {json_path}", flush=True)
except Exception as e:
print(f" ❌ Chyba při ukládání JSON souboru: {e}", flush=True)
# 3) Extract transactions from JSON tree
tlist = data.get("accountStatement", {}).get("transactionList", {}).get("transaction", [])
if isinstance(tlist, dict):
tlist = [tlist]
print(f" Počet transakcí v období: {len(tlist)}", flush=True)
if not tlist:
print(" Žádné transakce, jdu dál.\n", flush=True)
continue
fio_acc_id = data.get("accountStatement", {}).get("info", {}).get("accountId")
if cfg_acc_num and fio_acc_id and cfg_acc_num.split("/")[0] not in fio_acc_id:
print(
f" ⚠ Upozornění: accountId z Fio ({fio_acc_id}) "
f"se neshoduje s account_number v konfiguraci ({cfg_acc_num})",
flush=True,
)
# 4) Build list of MySQL rows
rows = []
skipped_pk_account = 0
for t in tlist:
# id_operace = Column22 (tvé "id pohybu")
id_operace_val = safe_col(t, 22)
# Pokud chybí, vygenerujeme si stabilní fallback (hash) vejde se do VARCHAR(20)
if id_operace_val is None:
fallback = generate_fallback_id(fio_acc_id or "", t)
id_operace_val = fallback
# Můžeš odkomentovat, pokud chceš vidět, kde se používá fallback
# print(f" ⚠ Fallback id_operace (hash) pro transakci: {fallback}", flush=True)
# Bez PK nemá smysl zápis jen pro jistotu, fallback by měl vše pokrýt
if id_operace_val is None:
skipped_pk_account += 1
continue
transaction_date = clean_date(safe_col(t, 0))
if not transaction_date:
# Bez data by insert stejně spadl (NOT NULL), tak to raději přeskočíme
if DEBUG_ON_ERROR:
print(f" ⚠ Přeskakuji transakci bez data, id_operace={id_operace_val}", flush=True)
skipped_pk_account += 1
continue
id_pokynu_val = safe_col(t, 19) # tvé "id pokynu" = Column19
row = {
"id_operace": str(id_operace_val),
"cislo_uctu": fio_acc_id,
"transaction_date": transaction_date,
"amount": safe_col(t, 1),
"currency": safe_col(t, 14),
"typ": safe_col(t, 8),
"provedl": safe_col(t, 9),
"protiucet": safe_col(t, 2),
"kod_banky": safe_col(t, 3),
"nazev_protiuctu": safe_col(t, 10),
"nazev_banky": safe_col(t, 12),
"api_bic": safe_col(t, 26),
"vs": safe_col(t, 5),
"ks": safe_col(t, 4),
"ss": safe_col(t, 6),
"zprava_pro_prijemce": safe_col(t, 16),
"uziv_identifikace": safe_col(t, 7),
"komentar": safe_col(t, 25),
"upr_objem_mena": safe_col(t, 18),
"id_pokynu": str(id_pokynu_val) if id_pokynu_val is not None else None,
"reference_platce": safe_col(t, 27),
}
rows.append(row)
if skipped_pk_account:
print(f" ⚠ Přeskočeno {skipped_pk_account} transakcí kvůli chybějícímu/invalidnímu PK nebo datu.", flush=True)
total_skipped_pk += skipped_pk_account
# 5) INSERT rows into MySQL in batches
inserted = 0
skipped_error_account = 0
for i in range(0, len(rows), BATCH_SIZE):
chunk = rows[i: i + BATCH_SIZE]
try:
cur.executemany(sql, chunk)
conn.commit()
inserted += len(chunk)
except Error as e:
print(f" ❌ Chyba při zápisu batch do DB: {e}", flush=True)
conn.rollback()
if DEBUG_ON_ERROR:
print(" ► Přecházím na per-row insert pro detail chyb...", flush=True)
for row in chunk:
try:
cur.execute(sql, row)
conn.commit()
inserted += 1
except Error as e_row:
skipped_error_account += 1
conn.rollback()
print(
f" ✗ Chybná transakce id_operace={row.get('id_operace')} "
f"datum={row.get('transaction_date')} částka={row.get('amount')} "
f"{e_row}",
flush=True,
)
elapsed = time.time() - t0
total_inserted += inserted
total_skipped_error += skipped_error_account
print(
f" ✓ Zapsáno (insert/update): {inserted} řádků do DB "
f"(přeskočeno chybějící PK/dat {skipped_pk_account}, chybou insertu {skipped_error_account}) "
f"za {elapsed:.2f} s\n",
flush=True,
)
# Close DB
cur.close()
conn.close()
total_elapsed = time.time() - start_all
print(
f"=== Hotovo. Celkem zapsáno {total_inserted} transakcí. "
f"Přeskočeno kvůli PK/datům: {total_skipped_pk}, kvůli chybě insertu: {total_skipped_error}. "
f"Celkový čas: {total_elapsed:.2f} s ===",
flush=True,
)
# ======================================================
# ENTRY POINT
# ======================================================
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,319 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
# Force UTF-8 output for Scheduled Tasks
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
FIO EXPORT SCRIPT — FINÁLNÍ VERZE S KONTROLNÍM VÝPISEM
-------------------------------------------
Skript nyní vypisuje POČET ŘÁDKŮ NAČTENÝCH Z DATABÁZE pro každý list.
Tím ověříme, zda se data ztrácí při čtení z DB, nebo až při exportu do Excelu.
"""
import mysql.connector
from mysql.connector import Error
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from datetime import datetime, date as dt_date
from decimal import Decimal
import os
import glob
import json
from typing import List, Dict, Any
# ======================================================
# CONFIGURATION
# ======================================================
# MySQL server parameters
DB_HOST = "192.168.1.76"
DB_PORT = 3307
DB_USER = "root"
DB_PASS = "Vlado9674+"
DB_NAME = "fio"
# Where to save Excel files
OUTPUT_DIR = r"Z:\Dropbox\!!!Days\Downloads Z230"
# JSON file with list of accounts (name + account_number)
ACCOUNTS_JSON = r"C:\Users\vlado\PycharmProjects\FIO\accounts.json"
# VÍCE SLOUPCŮ PŘENASTAVENO NA TEXT
TEXT_COLUMNS = [
"cislo_uctu",
"protiucet",
"kod_banky",
"vs",
"ks",
"ss",
"id_operace",
"id_pokynu"
]
# ======================================================
# REMOVE OLD EXPORT FILES (Beze změny)
# ======================================================
def cleanup_old_exports():
"""
Deletes older versions of exported XLSX files.
"""
patterns = [
os.path.join(OUTPUT_DIR, "*FIO*transaction*.xlsx"),
os.path.join(OUTPUT_DIR, "*FIO*transactions*.xlsx"),
os.path.join(OUTPUT_DIR, "*FIO_transactions*.xlsx"),
]
for pattern in patterns:
for file in glob.glob(pattern):
try:
os.remove(file)
print(f"🗑 Deleted old export: {file}")
except:
pass
# ======================================================
# CORE EXCEL FORMATTING FUNCTION (Oprava konverze typů zachována)
# ======================================================
def format_sheet(ws, rows: List[Dict[str, Any]], headers: List[str]):
"""
Applies ALL formatting rules to a worksheet:
FIX: Explicitní konverze datových typů pro OpenPyXL zachována.
"""
# -------------------------------
# 1) Format HEADER row
# -------------------------------
for col_idx in range(1, len(headers) + 1):
cell = ws.cell(row=1, column=col_idx)
cell.font = Font(bold=True)
cell.fill = PatternFill(start_color="FFFF00", fill_type="solid")
# -------------------------------
# 2) Write DATA rows (OPRAVA KONVERZE TYPŮ)
# -------------------------------
# Klíčová konverze pro Decimal/Date objekty
for row in rows:
excel_row = []
for h in headers:
val = row[h]
# 🛠️ KLÍČOVÁ OPRAVA: Konverze MySQL/Decimal objektů na nativní Python typy
if isinstance(val, Decimal):
val = float(val)
elif isinstance(val, dt_date) and not isinstance(val, datetime):
val = val.strftime("%Y-%m-%d")
# -----------------------------------------------------------
# Pro text-sensitive sloupce (ID, symboly), zapisuj ="hodnota"
if h in TEXT_COLUMNS and val is not None:
val = str(val)
excel_row.append(f'="{val}"')
else:
excel_row.append(val)
ws.append(excel_row)
# -------------------------------
# 3) Background coloring by "amount"
# -------------------------------
fill_red = PatternFill(start_color="FFFFDDDD", end_color="FFFFDDDD", fill_type="solid")
fill_green = PatternFill(start_color="FFEEFFEE", end_color="FFEEFFEE", fill_type="solid")
try:
amount_col_index = headers.index("amount") + 1
except ValueError:
amount_col_index = -1
if amount_col_index != -1:
for row_idx in range(2, len(rows) + 2):
cell_amount = ws.cell(row=row_idx, column=amount_col_index)
try:
value = float(str(cell_amount.value).strip('="'))
except:
value = 0
fill = fill_red if value < 0 else fill_green
for col_idx in range(1, len(headers) + 1):
ws.cell(row=row_idx, column=col_idx).fill = fill
# -------------------------------
# 4) Fixed column widths (22 sloupců)
# -------------------------------
fixed_widths = [
13, 14, 11, 14, 8, 14, 11, 30, 30, 25,
13, 13, 13, 35, 30, 15, 13, 30, 20, 13,
30, 20
]
if len(fixed_widths) < len(headers):
fixed_widths.extend([15] * (len(headers) - len(fixed_widths)))
for i, width in enumerate(fixed_widths, start=1):
col_letter = chr(64 + i)
ws.column_dimensions[col_letter].width = width
# -------------------------------
# 5) Add borders + alignment
# -------------------------------
thin = Side(border_style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
align_center = Alignment(horizontal="center")
total_rows = len(rows) + 1
total_cols = len(headers)
ALIGN_CENTER_COLS = ["id_operace", "transaction_date", "currency", "kod_banky", "vs", "ks", "ss"]
center_indices = [headers.index(col) + 1 for col in ALIGN_CENTER_COLS if col in headers]
for row_idx in range(1, total_rows + 1):
for col_idx in range(1, total_cols + 1):
cell = ws.cell(row=row_idx, column=col_idx)
cell.border = border
if col_idx in center_indices:
cell.alignment = align_center
ws.freeze_panes = "A2"
ws.auto_filter.ref = ws.dimensions
# ======================================================
# MAIN EXPORT PROCESS
# ======================================================
def export_fio():
print("Connecting to MySQL...")
# Connect to MySQL database
try:
conn = mysql.connector.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASS,
database=DB_NAME
)
except Error as e:
print("❌ Failed to connect:", e)
return
# Používáme dictionary=True pro získání dat jako slovník
cur = conn.cursor(dictionary=True)
# -------------------------------
# Load accounts.json
# -------------------------------
with open(ACCOUNTS_JSON, "r", encoding="utf-8") as f:
accounts = json.load(f)
# -------------------------------
# Define priority first sheets
# -------------------------------
preferred_order = [
"CZK rodina",
"CZK ordinace",
"CZK na jídlo",
"CZK TrialHelp",
"CZK maminka svojě věci"
]
accounts_sorted = []
# Step 1: add priority accounts first
for pref in preferred_order:
for acc in accounts:
if acc["name"] == pref:
accounts_sorted.append(acc)
# Step 2: add remaining accounts afterward
for acc in accounts:
if acc not in accounts_sorted:
accounts_sorted.append(acc)
# -------------------------------
# Create a new Excel workbook
# -------------------------------
wb = Workbook()
wb.remove(wb.active) # remove default empty sheet
# -------------------------------
# FIRST SHEET: ALL TRANSACTIONS
# -------------------------------
cur.execute("SELECT * FROM transactions ORDER BY transaction_date DESC")
all_rows = cur.fetchall()
if all_rows:
headers = list(all_rows[0].keys())
# Tisk počtu řádků pro "ALL"
print(f"➡ Sheet: ALL | Řádků z DB: {len(all_rows)}")
ws_all = wb.create_sheet(title="ALL")
ws_all.append(headers)
format_sheet(ws_all, all_rows, headers)
# -------------------------------
# INDIVIDUAL SHEETS PER ACCOUNT
# -------------------------------
for acc in accounts_sorted:
acc_num = acc["account_number"]
sheet_name = acc["name"][:31] # Excel sheet name limit
print(f"➡ Creating sheet: {sheet_name}", end=' | ') # Tisk názvu listu
query = f"""
SELECT *
FROM transactions
WHERE cislo_uctu = '{acc_num}'
ORDER BY transaction_date DESC
"""
cur.execute(query)
rows = cur.fetchall()
# VÝPIS POČTU ZÁZNAMŮ Z DB
print(f"Řádků z DB: {len(rows)}")
if not rows:
print(f"⚠ No data for {sheet_name}")
continue
headers = list(rows[0].keys())
ws = wb.create_sheet(title=sheet_name)
ws.append(headers)
format_sheet(ws, rows, headers)
conn.close()
# -------------------------------
# Save Excel file
# -------------------------------
cleanup_old_exports()
# File name includes timestamp
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
filename = f"{timestamp} FIO transactions.xlsx"
output_file = os.path.join(OUTPUT_DIR, filename)
wb.save(output_file)
print(f"✅ Export complete:\n{output_file}")
# ======================================================
# MAIN ENTRY POINT
# ======================================================
if __name__ == "__main__":
export_fio()

View File

@@ -0,0 +1,268 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
# UTF-8 console for Scheduled Tasks
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
import mysql.connector
from mysql.connector import Error
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
from datetime import datetime, date as dt_date
from decimal import Decimal
from pathlib import Path
# ==============================
# DELETE OLD REPORTS (OPTION C)
# ==============================
def delete_all_old_reports(directory: Path):
"""Deletes all previously generated ordinace expense reports."""
pattern = "*fio ordinace transactions.xlsx"
deleted = 0
for f in directory.glob(pattern):
try:
f.unlink()
deleted += 1
print(f"🗑 Deleted old report: {f.name}")
except Exception as e:
print(f"❌ Could not delete {f.name}: {e}")
if deleted == 0:
print(" No old reports to delete.")
else:
print(f"✓ Deleted {deleted} old reports.")
# ======================================================
# CONFIG
# ======================================================
DB = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "fio",
}
ORDINACE_ACCOUNT = "2800046620"
REPORTOVAT = {
"VZP": "1114007221",
"VOZP": "2010009091",
"ČPZP": "2054108761",
"OZP": "2070101041",
"ZPŠ": "2090309181",
"ZPMV": "2112108031",
}
OUTPUT_DIR = Path(r"z:\Dropbox\Ordinace\Reporty")
TEXT_COLUMNS = [
"cislo_uctu", "protiucet", "kod_banky",
"vs", "ks", "ss",
"id_operace", "id_pokynu"
]
# ======================================================
# FORMAT APPLYING (copied from main report)
# ======================================================
def format_sheet(ws, rows, headers):
# ---------------------- HEADER -----------------------
for col_idx in range(1, len(headers) + 1):
cell = ws.cell(row=1, column=col_idx)
cell.font = Font(bold=True)
cell.fill = PatternFill(start_color="FFFF00", fill_type="solid")
# ---------------------- DATA ROWS --------------------
for row in rows:
excel_row = []
for h in headers:
val = row[h]
# Convert MySQL data types
if isinstance(val, Decimal):
val = float(val)
elif isinstance(val, dt_date):
val = val.strftime("%Y-%m-%d")
# For certain columns, force ="text"
if h in TEXT_COLUMNS and val is not None:
excel_row.append(f'="{val}"')
else:
excel_row.append(val)
ws.append(excel_row)
# ---------------------- COLORING ---------------------
fill_red = PatternFill(start_color="FFFFDDDD", fill_type="solid")
fill_green = PatternFill(start_color="FFEEFFEE", fill_type="solid")
try:
amount_col = headers.index("amount") + 1
except ValueError:
amount_col = -1
if amount_col != -1:
for r in range(2, len(rows) + 2):
cell = ws.cell(row=r, column=amount_col)
try:
value = float(str(cell.value).strip('="'))
except:
value = 0
fill = fill_red if value < 0 else fill_green
for c in range(1, len(headers) + 1):
ws.cell(row=r, column=c).fill = fill
# ---------------------- COLUMN WIDTHS -----------------
fixed_widths = [
13, 14, 11, 14, 8, 14, 11, 30, 30, 25,
13, 13, 13, 35, 30, 15, 13, 30, 20, 13,
30, 20
]
if len(fixed_widths) < len(headers):
fixed_widths.extend([15] * (len(headers) - len(fixed_widths)))
for i, width in enumerate(fixed_widths, start=1):
letter = chr(64 + i)
ws.column_dimensions[letter].width = width
# ---------------------- BORDERS & ALIGNMENT ----------
thin = Side(border_style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
align_center = Alignment(horizontal="center")
center_cols = ["id_operace", "transaction_date", "currency", "kod_banky", "vs", "ks", "ss"]
center_indices = [headers.index(c) + 1 for c in center_cols if c in headers]
total_rows = len(rows) + 1
total_cols = len(headers)
for r in range(1, total_rows + 1):
for c in range(1, total_cols + 1):
cell = ws.cell(row=r, column=c)
cell.border = border
if c in center_indices:
cell.alignment = align_center
ws.freeze_panes = "A2"
ws.auto_filter.ref = ws.dimensions
# ======================================================
# EXPORT
# ======================================================
def export_ordinace():
print("Connecting MySQL...")
conn = mysql.connector.connect(**DB)
cur = conn.cursor(dictionary=True)
# ============================
# Load ALL transactions for ordinace
# ============================
sql_all = f"""
SELECT *
FROM transactions
WHERE cislo_uctu = '{ORDINACE_ACCOUNT}'
ORDER BY transaction_date DESC;
"""
cur.execute(sql_all)
all_rows = cur.fetchall()
if not all_rows:
print("❌ No transactions found for ordinace account.")
return
headers = list(all_rows[0].keys())
# Workbook
wb = Workbook()
wb.remove(wb.active)
# --------------------- ALL sheet ---------------------
ws_all = wb.create_sheet("ALL ordinace")
ws_all.append(headers)
format_sheet(ws_all, all_rows, headers)
print(f"➡ ALL ordinace rows: {len(all_rows)}")
# --------------------- INSURANCE sheets ---------------
summary = []
for name, acc in REPORTOVAT.items():
print(f"➡ Pojišťovna {name} ({acc})")
sql = f"""
SELECT *
FROM transactions
WHERE cislo_uctu = '{ORDINACE_ACCOUNT}'
AND (
protiucet <> '2070101041'
OR (protiucet = '2070101041' AND amount > 0)
)
AND protiucet = '{acc}'
ORDER BY transaction_date DESC;
"""
cur.execute(sql)
rows = cur.fetchall()
count = len(rows)
summa = sum(float(r["amount"]) for r in rows) if rows else 0
summary.append({
"Pojišťovna": name,
"Účet": acc,
"Počet transakcí": count,
"Součet": summa
})
if not rows:
print(f" ⚠ No rows")
continue
ws = wb.create_sheet(name)
ws.append(headers)
format_sheet(ws, rows, headers)
print(f"{count} rows, sum {summa:.2f}")
# --------------------- SUMMARY sheet -----------------
ws_s = wb.create_sheet("Přehled")
ws_s.append(["Pojišťovna", "Účet", "Počet transakcí", "Součet Kč"])
for row in summary:
ws_s.append([
row["Pojišťovna"],
row["Účet"],
row["Počet transakcí"],
f"{row['Součet']:.2f}"
])
# ===========================
# Save Excel
# ===========================
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
out_file = OUTPUT_DIR / f"{timestamp} FIO ordinace transactions.xlsx"
wb.save(out_file)
print(f"\n✅ Export hotový:\n{out_file}")
# ======================================================
# MAIN
# ======================================================
if __name__ == "__main__":
delete_all_old_reports(OUTPUT_DIR)
export_ordinace()

View File

@@ -0,0 +1,169 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Expenses Ordinace Report Generator (2025)
------------------------------------------
Reads JSON with tab definitions + SQL queries.
Creates one Excel workbook with multiple sheets.
Uniform formatting for all tabs.
Deletes old reports before saving the new one.
"""
import json
import pandas as pd
import pymysql
from datetime import datetime
from pathlib import Path
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, PatternFill, Border, Side
from openpyxl.utils.dataframe import dataframe_to_rows
# ==============================
# CONFIG
# ==============================
JSON_TABS = r"expenses_tabs.json"
MYSQL = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "fio",
"charset": "utf8mb4"
}
EXPORT_DIR = Path(r"z:\Dropbox\Ordinace\Reporty")
EXPORT_DIR.mkdir(exist_ok=True, parents=True)
# ==============================
# DELETE OLD REPORTS (OPTION C)
# ==============================
def delete_all_old_reports(directory: Path):
"""Deletes all previously generated ordinace expense reports."""
pattern = "*fio ordinace expenses.xlsx"
deleted = 0
for f in directory.glob(pattern):
try:
f.unlink()
deleted += 1
print(f"🗑 Deleted old report: {f.name}")
except Exception as e:
print(f"❌ Could not delete {f.name}: {e}")
if deleted == 0:
print(" No old reports to delete.")
else:
print(f"✓ Deleted {deleted} old reports.")
# ==============================
# FORMATTING HELPERS
# ==============================
def format_sheet(ws):
"""Apply column widths, header styling, borders, autofilter."""
# Yellow header (Option A)
header_fill = PatternFill("solid", fgColor="FFF200")
bold_font = Font(bold=True, color="000000")
center_align = Alignment(horizontal="center", vertical="center")
thin = Side(border_style="thin", color="000000")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
# Autofilter
if ws.max_row > 1:
ws.auto_filter.ref = ws.dimensions
# Auto column widths
for col in ws.columns:
max_len = 0
letter = col[0].column_letter
for cell in col:
try:
max_len = max(max_len, len(str(cell.value)))
except Exception:
pass
ws.column_dimensions[letter].width = min(max_len + 2, 50)
# Style header row
for cell in ws[1]:
cell.font = bold_font
cell.fill = header_fill
cell.alignment = center_align
cell.border = border
# Border for all body cells
for row in ws.iter_rows(min_row=2):
for cell in row:
cell.border = border
# ==============================
# MAIN
# ==============================
def main():
print("=== Expenses Ordinace Report (with cleanup) ===")
# Load JSON tabs
with open(JSON_TABS, "r", encoding="utf-8") as f:
config = json.load(f)
tabs = config.get("tabs", [])
print(f"Loaded {len(tabs)} tab definitions.")
# Connect DB
conn = pymysql.connect(**MYSQL)
# Prepare workbook
wb = Workbook()
wb.remove(wb.active)
# Process each tab
for tab in tabs:
name = tab["name"]
sql = tab["sql"]
print(f"→ Running tab: {name}")
df = pd.read_sql(sql, conn)
df = df.fillna("")
# Swap columns N (index 13) and O (index 14)
cols = df.columns.tolist()
if len(cols) >= 15:
cols[13], cols[14] = cols[14], cols[13]
df = df[cols]
# Create sheet
sheet_name = name[:31]
ws = wb.create_sheet(sheet_name)
# Write DataFrame
for row in dataframe_to_rows(df, index=False, header=True):
ws.append(row)
# Apply formatting
format_sheet(ws)
conn.close()
# Delete older reports
delete_all_old_reports(EXPORT_DIR)
# Save new report
OUTFILE = EXPORT_DIR / f"{datetime.now():%Y-%m-%d %H-%M-%S} FIO ordinace expenses.xlsx"
wb.save(OUTFILE)
print(f"\n✔ Report generated:\n{OUTFILE}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,262 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
import time
import json
from pathlib import Path
import hashlib
import mysql.connector
from mysql.connector import Error
# ====================================================================
# UTF-8 OUTPUT
# ====================================================================
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# ====================================================================
# CONFIGURATION
# ====================================================================
BASE_DIR = Path(r"z:\Dropbox\!!!Days\Downloads Z230\Fio")
DB = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "fio",
"charset": "utf8mb4",
}
BATCH_SIZE = 500
DEBUG_ON_ERROR = True
# ====================================================================
# HELPERS
# ====================================================================
def safe_col(t: dict, n: int):
key = f"column{n}"
val = t.get(key)
return val.get("value") if val else None
def clean_date(dt_str: str):
if not dt_str:
return None
return dt_str[:10]
def generate_fallback_id(account_id: str, t: dict) -> str:
raw_date = clean_date(safe_col(t, 0)) or ""
amount = str(safe_col(t, 1) or "")
protiucet = str(safe_col(t, 2) or "")
vs = str(safe_col(t, 5) or "")
src = f"{account_id}|{raw_date}|{amount}|{protiucet}|{vs}"
digest = hashlib.sha1(src.encode("utf-8")).hexdigest()
return digest[:20]
def load_json_file(p: Path):
try:
with open(p, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f" ❌ Nelze načíst JSON: {p}{e}")
return None
# ====================================================================
# MAIN
# ====================================================================
def main():
start_all = time.time()
print("=== Fio HISTORICKÝ IMPORT (ze všech JSON na disku) ===\n", flush=True)
print(f"Hledám JSON soubory v: {BASE_DIR}", flush=True)
# Najdeme všechny JSONy ve všech podadresářích
all_json_paths = list(BASE_DIR.rglob("*.json"))
print(f"Nalezeno JSON souborů: {len(all_json_paths)}\n", flush=True)
if not all_json_paths:
print("Nenalezeny žádné JSON soubory. Konec.")
return
# DB připojení
try:
conn = mysql.connector.connect(
host=DB["host"],
port=DB["port"],
user=DB["user"],
password=DB["password"],
database=DB["database"],
charset=DB["charset"]
)
cur = conn.cursor()
except Error as e:
print(f"FATAL DB ERROR: {e}")
return
sql = """
INSERT INTO transactions
(
cislo_uctu, id_operace, transaction_date, amount, currency,
protiucet, kod_banky, nazev_protiuctu, nazev_banky, typ,
vs, ks, ss, uziv_identifikace, zprava_pro_prijemce,
provedl, id_pokynu, komentar, upr_objem_mena, api_bic, reference_platce
)
VALUES
(
%(cislo_uctu)s, %(id_operace)s, %(transaction_date)s, %(amount)s, %(currency)s,
%(protiucet)s, %(kod_banky)s, %(nazev_protiuctu)s, %(nazev_banky)s, %(typ)s,
%(vs)s, %(ks)s, %(ss)s, %(uziv_identifikace)s, %(zprava_pro_prijemce)s,
%(provedl)s, %(id_pokynu)s, %(komentar)s, %(upr_objem_mena)s, %(api_bic)s, %(reference_platce)s
)
ON DUPLICATE KEY UPDATE
transaction_date = VALUES(transaction_date),
amount = VALUES(amount),
currency = VALUES(currency),
protiucet = VALUES(protiucet),
kod_banky = VALUES(kod_banky),
nazev_protiuctu = VALUES(nazev_protiuctu),
nazev_banky = VALUES(nazev_banky),
typ = VALUES(typ),
vs = VALUES(vs),
ks = VALUES(ks),
ss = VALUES(ss),
uziv_identifikace = VALUES(uziv_identifikace),
zprava_pro_prijemce = VALUES(zprava_pro_prijemce),
provedl = VALUES(provedl),
id_pokynu = VALUES(id_pokynu),
komentar = VALUES(komentar),
upr_objem_mena = VALUES(upr_objem_mena),
api_bic = VALUES(api_bic),
reference_platce = VALUES(reference_platce)
"""
total_processed_files = 0
total_rows_inserted = 0
total_rows_skipped = 0
# ============================================
# PROCES JSON SOUBOR PO SOUBORU
# ============================================
for p in all_json_paths:
total_processed_files += 1
print(f"--- Soubor {total_processed_files}/{len(all_json_paths)}: {p}", flush=True)
data = load_json_file(p)
if not data:
continue
# JSON má strukturu jako při fetchnutí z API
account_info = data.get("accountStatement", {}).get("info", {})
account_id = account_info.get("accountId")
if not account_id:
print(" ⚠ Nelze zjistit cislo_uctu z JSON! Přeskakuji.")
continue
tlist = data.get("accountStatement", {}).get("transactionList", {}).get("transaction", [])
if isinstance(tlist, dict):
tlist = [tlist]
print(f" Počet transakcí: {len(tlist)}", flush=True)
if not tlist:
continue
rows = []
skipped_local = 0
# Převést transakce → DB řádky
for t in tlist:
id_operace_val = safe_col(t, 22)
if id_operace_val is None:
id_operace_val = generate_fallback_id(account_id, t)
transaction_date = clean_date(safe_col(t, 0))
if not transaction_date:
skipped_local += 1
continue
id_pokynu_val = safe_col(t, 19)
row = {
"cislo_uctu": account_id,
"id_operace": str(id_operace_val),
"transaction_date": transaction_date,
"amount": safe_col(t, 1),
"currency": safe_col(t, 14),
"protiucet": safe_col(t, 2),
"kod_banky": safe_col(t, 3),
"nazev_protiuctu": safe_col(t, 10),
"nazev_banky": safe_col(t, 12),
"api_bic": safe_col(t, 26),
"typ": safe_col(t, 8),
"provedl": safe_col(t, 9),
"vs": safe_col(t, 5),
"ks": safe_col(t, 4),
"ss": safe_col(t, 6),
"zprava_pro_prijemce": safe_col(t, 16),
"uziv_identifikace": safe_col(t, 7),
"komentar": safe_col(t, 25),
"upr_objem_mena": safe_col(t, 18),
"id_pokynu": str(id_pokynu_val) if id_pokynu_val else None,
"reference_platce": safe_col(t, 27),
}
rows.append(row)
total_rows_skipped += skipped_local
print(f" Přeskočeno transakcí bez data/PK: {skipped_local}")
# Batch insert
inserted = 0
for i in range(0, len(rows), BATCH_SIZE):
chunk = rows[i: i + BATCH_SIZE]
try:
cur.executemany(sql, chunk)
conn.commit()
inserted += len(chunk)
except Error as e:
print(f" ❌ Batch insert error: {e}")
conn.rollback()
if DEBUG_ON_ERROR:
print(" ► Per-row insert for debugging…")
for row in chunk:
try:
cur.execute(sql, row)
conn.commit()
inserted += 1
except Error as e_row:
conn.rollback()
print(f" ✗ Chyba transakce id_operace={row['id_operace']}{e_row}")
total_rows_inserted += inserted
print(f" ✓ Zapsáno/aktualizováno: {inserted}")
# ======================
# ZÁVĚR
# ======================
cur.close()
conn.close()
elapsed = time.time() - start_all
print("\n===== HOTOVO =====", flush=True)
print(f"Souborů zpracováno: {total_processed_files}")
print(f"Transakcí zapsáno/aktualizováno: {total_rows_inserted}")
print(f"Transakcí přeskočeno: {total_rows_skipped}")
print(f"Celkový čas: {elapsed:.2f} s")
print("==================", flush=True)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,354 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
import time
from datetime import date, timedelta
from pathlib import Path
import json
import requests
import mysql.connector
from mysql.connector import Error
from typing import Dict, Any, List
# ====================================================================
# A. PONECHÁNO: Vynucení UTF-8 pro správnou diakritiku v plánovaných úlohách
# ====================================================================
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
"""
FIO MULTIACCOUNT IMPORTER — KONEČNÁ VERZE S AUDITEM
============================================================
Přidána zpětná kontrola (audit) po každé dávce, která ověřuje,
že se ID transakcí skutečně zapsaly do databáze.
"""
# =========================================
# CONFIGURATION
# =========================================
# JSON file containing multiple account configs:
# [
#    { "name": "CZK rodina", "account_number": "2100046291", "token": "xxx" },
#    ...
# ]
ACCOUNTS_FILE = r"/accounts.json"
# Directory where raw JSON files from Fio API will be stored.
JSON_BASE_DIR = r"z:\Dropbox\!!!Days\Downloads Z230\Fio"
# MySQL connection parameters
DB = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "fio",
"charset": "utf8mb4",
}
# How many transactions insert per batch (performance tuning)
BATCH_SIZE = 500
# How many days back we load from Fio (default = last 90 days)
DAYS_BACK = 90
# =========================================
# HELPERS (Beze změny)
# =========================================
def load_accounts(path: str) -> List[Dict[str, str]]:
"""Reads accounts.json and validates content."""
with open(path, "r", encoding="utf-8") as f:
accounts = json.load(f)
for acc in accounts:
for key in ("name", "account_number", "token"):
if key not in acc:
raise ValueError(f"Missing '{key}' in account config: {acc}")
return accounts
def fio_url_for_period(token: str, d_from: date, d_to: date) -> str:
"""Constructs the URL for Fio REST API periods endpoint."""
from_str = d_from.strftime("%Y-%m-%d")
to_str = d_to.strftime("%Y-%m-%d")
return f"https://fioapi.fio.cz/v1/rest/periods/{token}/{from_str}/{to_str}/transactions.json"
def fetch_fio_json(token: str, d_from: date, d_to: date) -> Any:
"""Calls Fio API and fetches JSON."""
url = fio_url_for_period(token, d_from, d_to)
resp = requests.get(url, timeout=30)
if resp.status_code != 200:
print(f"  ❌ HTTP {resp.status_code} from Fio: {url}", flush=True)
return None
try:
return resp.json()
except json.JSONDecodeError:
print("  ❌ Cannot decode JSON from Fio response", flush=True)
return None
def safe_col(t: dict, n: int) -> Any:
"""SAFE ACCESSOR for Fio transaction column numbers (ColumnN)."""
key = f"column{n}"
val = t.get(key)
if not val:
return None
return val.get("value")
def clean_date(dt_str: str) -> str:
"""Strips timezone from Fio date string ("YYYY-MM-DD+HH:MM") → "YYYY-MM-DD"."""
if not dt_str:
return None
return str(dt_str)[:10]
def ensure_dir(path: Path):
"""Creates directory if it doesnt exist."""
path.mkdir(parents=True, exist_ok=True)
def save_json_for_account(base_dir: str, account_cfg: dict, data: dict, d_from: date, d_to: date) -> Path:
"""Saves raw JSON to disk."""
acc_num_raw = account_cfg["account_number"]
acc_folder_name = acc_num_raw.replace("/", "_")
out_dir = Path(base_dir) / acc_folder_name
ensure_dir(out_dir)
filename = f"{d_from.strftime('%Y-%m-%d')}_to_{d_to.strftime('%Y-%m-%d')}.json"
out_path = out_dir / filename
with open(out_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return out_path
# =========================================
# MAIN IMPORT LOGIC
# =========================================
def main():
start_all = time.time()
# Calculate time range (last N days)
today = date.today()
d_from = today - timedelta(days=DAYS_BACK)
d_to = today
print("=== Fio multi-account import v2 (NOVÝ KONEKTOR + AUDIT) ===", flush=True)
print(f"Období: {d_from}{d_to}", flush=True)
# Load all accounts from accounts.json
try:
accounts = load_accounts(ACCOUNTS_FILE)
except Exception as e:
print(f"FATÁLNÍ CHYBA při načítání účtů: {e}", flush=True)
return
print(f"  Účtů v konfiguraci: {len(accounts)}\n", flush=True)
# ZMĚNA: Připojení pomocí mysql.connector
try:
conn = mysql.connector.connect(
host=DB["host"],
port=DB["port"],
user=DB["user"],
password=DB["password"],
database=DB["database"],
charset=DB["charset"]
)
cur = conn.cursor()
except Error as e:
print(f"FATÁLNÍ CHYBA při připojení k DB: {e}", flush=True)
return
# SQL INSERT dotaz přizpůsobený nové DB struktuře
sql = """
INSERT INTO transactions
(id_operace, cislo_uctu, transaction_date, amount, currency, \
protiucet, kod_banky, nazev_protiuctu, nazev_banky, typ, \
vs, ks, ss, uziv_identifikace, zprava_pro_prijemce, \
provedl, id_pokynu, komentar, upr_objem_mena, api_bic, reference_platce)
VALUES (%(id_operace)s, %(cislo_uctu)s, %(transaction_date)s, %(amount)s, %(currency)s,
%(protiucet)s, %(kod_banky)s, %(nazev_protiuctu)s, %(nazev_banky)s, %(typ)s,
%(vs)s, %(ks)s, %(ss)s, %(uziv_identifikace)s, %(zprava_pro_prijemce)s,
%(provedl)s, %(id_pokynu)s, %(komentar)s, %(upr_objem_mena)s, %(api_bic)s, \
%(reference_platce)s) ON DUPLICATE KEY \
UPDATE \
cislo_uctu = \
VALUES (cislo_uctu), transaction_date = \
VALUES (transaction_date), amount = \
VALUES (amount), currency = \
VALUES (currency), protiucet = \
VALUES (protiucet), kod_banky = \
VALUES (kod_banky), nazev_protiuctu = \
VALUES (nazev_protiuctu), nazev_banky = \
VALUES (nazev_banky), typ = \
VALUES (typ), vs = \
VALUES (vs), ks = \
VALUES (ks), ss = \
VALUES (ss), uziv_identifikace = \
VALUES (uziv_identifikace), zprava_pro_prijemce = \
VALUES (zprava_pro_prijemce), provedl = \
VALUES (provedl), id_pokynu = \
VALUES (id_pokynu), komentar = \
VALUES (komentar), upr_objem_mena = \
VALUES (upr_objem_mena), api_bic = \
VALUES (api_bic), reference_platce = \
VALUES (reference_platce) \
"""
total_inserted = 0
# ======================================================
# PROCESS EACH ACCOUNT IN accounts.json
# ======================================================
for acc in accounts:
name = acc["name"]
cfg_acc_num = acc["account_number"]
token = acc["token"]
print(f"--- Účet: {name} ({cfg_acc_num}) ---", flush=True)
t0 = time.time()
# --- 1) Download JSON from Fio API
data = fetch_fio_json(token, d_from, d_to)
if data is None:
print("  Přeskakuji, žádná data / chyba API.\n", flush=True)
continue
# --- 2) Save raw JSON file to disk
try:
json_path = save_json_for_account(JSON_BASE_DIR, acc, data, d_from, d_to)
print(f"  JSON uložen do: {json_path}", flush=True)
except Exception as e:
print(f"  ❌ Chyba při ukládání JSON souboru: {e}", flush=True)
pass
# --- 3) Extract transactions from JSON tree
tlist = data.get("accountStatement", {}).get("transactionList", {}).get("transaction", [])
if isinstance(tlist, dict):
tlist = [tlist]
print(f"  Počet transakcí v období: {len(tlist)}", flush=True)
if not tlist:
print("  Žádné transakce, jdu dál.\n", flush=True)
continue
fio_acc_id = data.get("accountStatement", {}).get("info", {}).get("accountId")
if cfg_acc_num and fio_acc_id and cfg_acc_num.split("/")[0] not in fio_acc_id:
print(f"  ⚠ Upozornění: accountId z Fio ({fio_acc_id}) "
f"se neshoduje s account_number v konfiguraci ({cfg_acc_num})", flush=True)
# --- 4) Build list of MySQL rows (MAPOVÁNÍ S KONVERZÍ NA STR)
rows = []
for t in tlist:
id_operace_val = safe_col(t, 22)
id_pokynu_val = safe_col(t, 17)
row = {
"id_operace": str(id_operace_val) if id_operace_val is not None else None,
"cislo_uctu": fio_acc_id,
"transaction_date": clean_date(safe_col(t, 0)),
"amount": safe_col(t, 1),
"currency": safe_col(t, 14),
"typ": safe_col(t, 8),
"provedl": safe_col(t, 9),
"protiucet": safe_col(t, 2),
"kod_banky": safe_col(t, 3),
"nazev_protiuctu": safe_col(t, 10),
"nazev_banky": safe_col(t, 12),
"api_bic": safe_col(t, 26),
"vs": safe_col(t, 5),
"ks": safe_col(t, 4),
"ss": safe_col(t, 6),
"zprava_pro_prijemce": safe_col(t, 16),
"uziv_identifikace": safe_col(t, 7),
"komentar": safe_col(t, 25),
"upr_objem_mena": safe_col(t, 18),
"id_pokynu": str(id_pokynu_val) if id_pokynu_val is not None else None,
"reference_platce": safe_col(t, 27),
}
rows.append(row)
# --- 5) INSERT rows into MySQL in batches S AUDITEM
inserted = 0
actual_inserted_count = 0 # Počet řádků potvrzených auditem
for i in range(0, len(rows), BATCH_SIZE):
chunk = rows[i: i + BATCH_SIZE]
# Získej ID transakcí z aktuální dávky pro audit
chunk_ids = [row["id_operace"] for row in chunk if row["id_operace"] is not None]
try:
# Krok 5.1: Provedení zápisu
cur.executemany(sql, chunk)
conn.commit()
# --- Krok 5.2: AUDIT: Zpětná kontrola ---
if chunk_ids:
# Vytvoření stringu ID pro SQL dotaz: ('id1', 'id2', ...)
id_string = ', '.join([f"'{i}'" for i in chunk_ids])
audit_query = f"SELECT COUNT(*) FROM transactions WHERE id_operace IN ({id_string})"
cur.execute(audit_query)
# ZMĚNA: Používáme fetchone()[0] pro mysql.connector
found_count = cur.fetchone()[0]
if found_count != len(chunk):
print(
f"  ⚠ AUDIT SELHAL: Zapsáno {len(chunk)}, ale v databázi nalezeno jen {found_count} pro tuto dávku!",
flush=True)
# Pokud audit selhal, tiskneme, která ID chybí (pro debug)
# To je složitější, ale alespoň víme, kolik jich chybí.
actual_inserted_count += found_count
else:
# Pokud dávka neobsahuje ID (což by neměla, ale pro jistotu)
actual_inserted_count += len(chunk)
inserted += len(chunk)
except Error as e:
print(f"  ❌ Chyba při zápisu do DB: {e}", flush=True)
conn.rollback()
break # Přerušíme cyklus, pokud nastane kritická chyba
elapsed = time.time() - t0
total_inserted += actual_inserted_count # Sčítáme počet potvrzený auditem
print(f"  ✓ Zapsáno (potvrzeno auditem): {actual_inserted_count} řádků do DB za {elapsed:.2f} s\n", flush=True)
# Nyní by měl tento počet odpovídat počtu transakcí pro daný účet, pokud nebyly žádné duplicity.
# Close DB
cur.close()
conn.close()
total_elapsed = time.time() - start_all
print(f"=== Hotovo. Celkem zapsáno (potvrzeno auditem) {total_inserted} transakcí. "
f"Celkový čas: {total_elapsed:.2f} s ===", flush=True)
# ======================================================
# ENTRY POINT
# ======================================================
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,222 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import io
import time
import json
# ZMĚNA: Importujeme oficiální konektor
import mysql.connector
from mysql.connector import Error
from typing import Dict, Any, List
from pathlib import Path
from datetime import date
# ====================================================================
# A. PONECHÁNO: Vynucení UTF-8 pro správnou diakritiku
# ====================================================================
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
# =========================================
# KONFIGURACE PRO TEST
# =========================================
TEST_JSON_PATH = r"z:\Dropbox\!!!Days\Downloads Z230\Fio\2100074583\2025-09-06_to_2025-12-05.json"
TEST_ACCOUNT_ID = "2100074583"
# MySQL connection parameters
DB = {
"host": "192.168.1.76",
"port": 3307,
"user": "root",
"password": "Vlado9674+",
"database": "fio",
"charset": "utf8", # mysql-connector používá 'utf8' nebo 'utf8mb4'
}
# =========================================
# POMOCNÉ FUNKCE (Z vašeho původního kódu)
# =========================================
def safe_col(t: dict, n: int) -> Any:
"""SAFE ACCESSOR for Fio transaction column numbers (ColumnN)."""
key = f"column{n}"
val = t.get(key)
if not val:
return None
return val.get("value")
def clean_date(dt_str: str) -> str:
"""Strips timezone from Fio date string ("YYYY-MM-DD+HH:MM") → "YYYY-MM-DD"."""
if not dt_str:
return None
return str(dt_str)[:10]
def load_test_data(path: str) -> Any:
"""Načte data přímo ze souboru JSON."""
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"FATÁLNÍ CHYBA: Nelze načíst JSON soubor z {path}: {e}", flush=True)
return None
# =========================================
# HLAVNÍ TESTOVACÍ LOGIKA
# =========================================
def run_test():
start_all = time.time()
print("=== TEST VKLÁDÁNÍ DAT Z JSON SOUBORU (OPRAVA S MYSQL-CONNECTOR) ===", flush=True)
print(f"Zdroj dat: {TEST_JSON_PATH}", flush=True)
# KROK 1: Načtení dat přímo z JSON souboru
data = load_test_data(TEST_JSON_PATH)
if data is None:
return
# KROK 2: Extrakce transakcí
tlist = data.get("accountStatement", {}).get("transactionList", {}).get("transaction", [])
if isinstance(tlist, dict):
tlist = [tlist]
fio_acc_id = data.get("accountStatement", {}).get("info", {}).get("accountId")
expected_count = len(tlist)
print(f"Očekávaný počet transakcí v JSON: {expected_count}", flush=True)
if expected_count == 0:
print("Test přeskočen, JSON je prázdný.", flush=True)
return
# KROK 3: Připojení k DB
try:
# ZMĚNA: Používáme mysql.connector
conn = mysql.connector.connect(
host=DB["host"],
port=DB["port"],
user=DB["user"],
password=DB["password"],
database=DB["database"],
charset=DB["charset"]
)
cur = conn.cursor()
# ZMĚNA: Používáme dictionary=True, což je zde podporováno
cur_dict = conn.cursor(dictionary=True)
except Error as e:
print(f"FATÁLNÍ CHYBA: Nelze se připojit k DB: {e}", flush=True)
return
# KROK 4: Sestavení SQL dotazu a řádků (Konverze ID na string zůstává pro bezpečnost)
sql = """
INSERT INTO transactions
(id_operace, cislo_uctu, transaction_date, amount, currency, \
protiucet, kod_banky, nazev_protiuctu, nazev_banky, typ, \
vs, ks, ss, uziv_identifikace, zprava_pro_prijemce, \
provedl, id_pokynu, komentar, upr_objem_mena, api_bic, reference_platce)
VALUES (%(id_operace)s, %(cislo_uctu)s, %(transaction_date)s, %(amount)s, %(currency)s,
%(protiucet)s, %(kod_banky)s, %(nazev_protiuctu)s, %(nazev_banky)s, %(typ)s,
%(vs)s, %(ks)s, %(ss)s, %(uziv_identifikace)s, %(zprava_pro_prijemce)s,
%(provedl)s, %(id_pokynu)s, %(komentar)s, %(upr_objem_mena)s, %(api_bic)s, \
%(reference_platce)s) ON DUPLICATE KEY \
UPDATE \
cislo_uctu = \
VALUES (cislo_uctu), transaction_date = \
VALUES (transaction_date), amount = \
VALUES (amount), currency = \
VALUES (currency), protiucet = \
VALUES (protiucet), kod_banky = \
VALUES (kod_banky), nazev_protiuctu = \
VALUES (nazev_protiuctu), nazev_banky = \
VALUES (nazev_banky), typ = \
VALUES (typ), vs = \
VALUES (vs), ks = \
VALUES (ks), ss = \
VALUES (ss), uziv_identifikace = \
VALUES (uziv_identifikace), zprava_pro_prijemce = \
VALUES (zprava_pro_prijemce), provedl = \
VALUES (provedl), id_pokynu = \
VALUES (id_pokynu), komentar = \
VALUES (komentar), upr_objem_mena = \
VALUES (upr_objem_mena), api_bic = \
VALUES (api_bic), reference_platce = \
VALUES (reference_platce) \
"""
rows = []
for t in tlist:
id_operace_val = safe_col(t, 22)
id_pokynu_val = safe_col(t, 17)
row = {
# Klíčová konverze ID na string (VARCHAR)
"id_operace": str(id_operace_val) if id_operace_val is not None else None,
"cislo_uctu": fio_acc_id,
"transaction_date": clean_date(safe_col(t, 0)),
"amount": safe_col(t, 1),
"currency": safe_col(t, 14),
"typ": safe_col(t, 8),
"provedl": safe_col(t, 9),
"protiucet": safe_col(t, 2),
"kod_banky": safe_col(t, 3),
"nazev_protiuctu": safe_col(t, 10),
"nazev_banky": safe_col(t, 12),
"api_bic": safe_col(t, 26),
"vs": safe_col(t, 5),
"ks": safe_col(t, 4),
"ss": safe_col(t, 6),
"zprava_pro_prijemce": safe_col(t, 16),
"uziv_identifikace": safe_col(t, 7),
"komentar": safe_col(t, 25),
"upr_objem_mena": safe_col(t, 18),
"id_pokynu": str(id_pokynu_val) if id_pokynu_val is not None else None,
"reference_platce": safe_col(t, 27),
}
rows.append(row)
# KROK 5: Vložení dat do DB
inserted = 0
try:
# Používáme executemany s připravenými daty
cur.executemany(sql, rows)
conn.commit()
inserted = len(rows)
except Error as e:
print(f"  ❌ Chyba při VKLÁDÁNÍ do DB: {e}", flush=True)
conn.rollback()
# KROK 6: Kontrola výsledku v DB (používá Dictionary=True)
check_query = f"SELECT count(*) AS count FROM transactions WHERE cislo_uctu = '{fio_acc_id}'"
cur_dict.execute(check_query)
current_db_count = cur_dict.fetchone()['count']
conn.close()
print(f"\n--- SHRNUTÍ TESTU ---", flush=True)
print(f"Očekávalo se vložení/aktualizace řádků: {expected_count}", flush=True)
print(f"Počet řádků zpracovaných skriptem: {inserted}", flush=True)
print(f"Aktuální počet záznamů pro účet {fio_acc_id} v DB: {current_db_count}", flush=True)
print(f"Celkový čas: {time.time() - start_all:.2f} s", flush=True)
if inserted == expected_count and current_db_count >= expected_count:
print("✅ TEST ÚSPĚŠNÝ: Všechny transakce byly vloženy/aktualizovány, nebo DB obsahuje očekávaný počet.",
flush=True)
else:
print("🔥 TEST SELHAL: Existuje nesoulad mezi očekávaným a skutečným počtem záznamů.", flush=True)
# ======================================================
# ENTRY POINT
# ======================================================
if __name__ == "__main__":
run_test()

View File

@@ -0,0 +1,53 @@
[
{
"name": "EUR tatínek 1",
"account_number": "2100074583",
"token": "GuV2Boaulx56ZiQUqUArgg6P9qdfEVKOoH6wF3PfAZ0fPS01r2WbiNiCsCcIBZ0U"
},
{
"name": "CZK rodina",
"account_number": "2100046291",
"token": "v0GJaAVeefzV1lnx1jPCf2nFF7SuOPzzrL5tobPNsC7oCChXG4hahDYVb8Rdcex0"
},
{
"name": "EUR TrialHelp",
"account_number": "2200787265",
"token": "9yG5g6lHWGS6YU2R2petm5DRYTb9orhJ8VPJ0p7RtTjlIo2vB83ynBlPCMGRIwzy"
},
{
"name": "CZK tatínek",
"account_number": "2400046293",
"token": "j2qmpvWe4RfKtBTBlhwC1VFED7HJlVAe23iPBH1TWis9htEyYe8fRejcMeSxOLqC"
},
{
"name": "CHF tatínek",
"account_number": "2402161017",
"token": "aNfK9iu6qIPlugGCR6gvSJ7NXtTkDfVVj8fBz4X1pORuGKf6VXjWin4wrr9WRjSd"
},
{
"name": "EUR tatínek 2",
"account_number": "2500074582",
"token": "aLsl9ETRUU1IgoYeinAzYWyruIoJvs6UvJKTGRlJcm7HaEc5ojsFdxJizyT9lREO"
},
{
"name": "CZK TrialHelp",
"account_number": "2900046548",
"token": "pKZVHbFDVsbTa8ryEaVc6A2nyrlb4TbT1tCiimieesHvhKFoJmYBRVjCpnvjiUUK"
},
{
"name": "CZK maminka svojě věci",
"account_number": "2003310572",
"token": "TkrRvnMK77OSSYdVulNvZcT6ltWcmjqkp3RN5WYwnBpNTuaKCWO1zHKOlDGAiNyv"
},
{
"name": "CZK na jídlo",
"account_number": "2403310563",
"token": "axRvFxu4VCzsDp5QZXN8LQ0fQUqzV2FEBZrM595x3Rtp10zowRBcGOFs9uNNPb7Q"
},
{
"name": "CZK ordinace",
"account_number": "2800046620",
"token": "Xzdr3eK7se7ZgeE3JujgeidGb0WrB7mGQ6HSOiBJzWi0kPURYKRpkRKB3ZOpt3rq"
}
]

View File

@@ -0,0 +1,40 @@
{
"tabs": [
{
"name": "All expenses",
"sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND amount < 0 AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;"
},
{
"name": "Vakciny Ptacek",
"sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '220205630' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;"
},
{
"name": "Poliklinika Prosek",
"sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '1387720540' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;"
},
{
"name": "Card payments",
"sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet IS NULL AND typ = 'Platba kartou' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;"
},
{
"name": "Vakciny Avenier",
"sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '5050012811' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;"
},
{
"name": "Socialka",
"sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '1011-7926201' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;"
},
{
"name": "Zdravotka",
"sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '1112001221' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;"
},
{
"name": "MEDIPOS",
"sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '14309711' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;"
},
{
"name": "MEDEVIO",
"sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '2701907026' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;"
}
]
}