diff --git a/#10 Download reports/10 Read.py b/#10 Download reports/10 Read.py new file mode 100644 index 0000000..42d459d --- /dev/null +++ b/#10 Download reports/10 Read.py @@ -0,0 +1,400 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import sys +import io +import time +from datetime import date, timedelta +from pathlib import Path +import json +import requests +import mysql.connector +from mysql.connector import Error +from typing import Dict, Any, List +import hashlib + +# ==================================================================== +# A. Vynucení UTF-8 pro správnou diakritiku v plánovaných úlohách +# ==================================================================== +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') +sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') + +""" +FIO MULTI–ACCOUNT IMPORTER — VERZE S ROBUSTNĚJŠÍM HANDLINGEM PK +=============================================================== +- mysql.connector (Oracle) pro stabilní manipulaci s datovými typy +- Bezpečné generování id_operace, pokud chybí Column22 +- Správné mapování id_pokynu = Column19 +- Detailní logování chybných řádků +""" + +# ========================================= +# CONFIGURATION +# ========================================= + +ACCOUNTS_FILE = r"c:\users\vlado\PycharmProjects\FIO\accounts.json" + +JSON_BASE_DIR = r"z:\Dropbox\!!!Days\Downloads Z230\Fio" + +DB = { + "host": "192.168.1.76", + "port": 3307, + "user": "root", + "password": "Vlado9674+", + "database": "fio", + "charset": "utf8mb4", +} + +BATCH_SIZE = 500 +DAYS_BACK = 90 + +# Zapíná detailnější logování při chybách insertu +DEBUG_ON_ERROR = True + + +# ========================================= +# HELPERS +# ========================================= + +def load_accounts(path: str) -> List[Dict[str, str]]: + """Reads accounts.json and validates content.""" + with open(path, "r", encoding="utf-8") as f: + accounts = json.load(f) + + for acc in accounts: + for key in ("name", "account_number", "token"): + if key not in acc: + raise ValueError(f"Missing '{key}' in account config: {acc}") + return accounts + + +def fio_url_for_period(token: str, d_from: date, d_to: date) -> str: + """Constructs the URL for Fio REST API periods endpoint.""" + from_str = d_from.strftime("%Y-%m-%d") + to_str = d_to.strftime("%Y-%m-%d") + return f"https://fioapi.fio.cz/v1/rest/periods/{token}/{from_str}/{to_str}/transactions.json" + + +def fetch_fio_json(token: str, d_from: date, d_to: date) -> Any: + """Calls Fio API and fetches JSON.""" + url = fio_url_for_period(token, d_from, d_to) + resp = requests.get(url, timeout=30) + + if resp.status_code != 200: + print(f" ❌ HTTP {resp.status_code} from Fio: {url}", flush=True) + return None + + try: + return resp.json() + except json.JSONDecodeError: + print(" ❌ Cannot decode JSON from Fio response", flush=True) + return None + + +def safe_col(t: dict, n: int) -> Any: + """SAFE ACCESSOR for Fio transaction column numbers (ColumnN).""" + key = f"column{n}" + val = t.get(key) + if not val: + return None + return val.get("value") + + +def clean_date(dt_str: str) -> str: + """Strips timezone from Fio date string ("YYYY-MM-DD+HH:MM") → "YYYY-MM-DD".""" + if not dt_str: + return None + return str(dt_str)[:10] + + +def ensure_dir(path: Path): + """Creates directory if it doesn’t exist.""" + path.mkdir(parents=True, exist_ok=True) + + +def save_json_for_account(base_dir: str, account_cfg: dict, data: dict, d_from: date, d_to: date) -> Path: + """Saves raw JSON to disk.""" + acc_num_raw = account_cfg["account_number"] + acc_folder_name = acc_num_raw.replace("/", "_") + + out_dir = Path(base_dir) / acc_folder_name + ensure_dir(out_dir) + + filename = f"{d_from.strftime('%Y-%m-%d')}_to_{d_to.strftime('%Y-%m-%d')}.json" + out_path = out_dir / filename + + with open(out_path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + return out_path + + +def generate_fallback_id(fio_acc_id: str, t: dict) -> str: + """ + Vygeneruje deterministický fallback ID, pokud chybí Column22 (id pohybu). + Použije SHA1 hash z několika sloupců a ořízne na 20 znaků, aby se vešlo + do VARCHAR(20) primárního klíče. + """ + raw_date = clean_date(safe_col(t, 0)) or "" + amount = str(safe_col(t, 1) or "") + protiucet = str(safe_col(t, 2) or "") + vs = str(safe_col(t, 5) or "") + + source = f"{fio_acc_id}|{raw_date}|{amount}|{protiucet}|{vs}" + digest = hashlib.sha1(source.encode("utf-8")).hexdigest() + return digest[:20] + + +# ========================================= +# MAIN IMPORT LOGIC +# ========================================= + +def main(): + start_all = time.time() + + today = date.today() + d_from = today - timedelta(days=DAYS_BACK) + d_to = today + + print("=== Fio multi-account import v3 (PK fix, lepší logování) ===", flush=True) + print(f"Období: {d_from} až {d_to}", flush=True) + + # Load all accounts from accounts.json + try: + accounts = load_accounts(ACCOUNTS_FILE) + except Exception as e: + print(f"FATÁLNÍ CHYBA při načítání účtů: {e}", flush=True) + return + + print(f" Účtů v konfiguraci: {len(accounts)}\n", flush=True) + + # Připojení k DB + try: + conn = mysql.connector.connect( + host=DB["host"], + port=DB["port"], + user=DB["user"], + password=DB["password"], + database=DB["database"], + charset=DB["charset"], + ) + cur = conn.cursor() + except Error as e: + print(f"FATÁLNÍ CHYBA při připojení k DB: {e}", flush=True) + return + + # SQL INSERT dotaz přizpůsobený nové DB struktuře + sql = """ + INSERT INTO transactions + ( + id_operace, cislo_uctu, transaction_date, amount, currency, + protiucet, kod_banky, nazev_protiuctu, nazev_banky, typ, + vs, ks, ss, uziv_identifikace, zprava_pro_prijemce, + provedl, id_pokynu, komentar, upr_objem_mena, api_bic, reference_platce + ) + VALUES + ( + %(id_operace)s, %(cislo_uctu)s, %(transaction_date)s, %(amount)s, %(currency)s, + %(protiucet)s, %(kod_banky)s, %(nazev_protiuctu)s, %(nazev_banky)s, %(typ)s, + %(vs)s, %(ks)s, %(ss)s, %(uziv_identifikace)s, %(zprava_pro_prijemce)s, + %(provedl)s, %(id_pokynu)s, %(komentar)s, %(upr_objem_mena)s, %(api_bic)s, %(reference_platce)s + ) + ON DUPLICATE KEY UPDATE + cislo_uctu = VALUES(cislo_uctu), + transaction_date = VALUES(transaction_date), + amount = VALUES(amount), + currency = VALUES(currency), + protiucet = VALUES(protiucet), + kod_banky = VALUES(kod_banky), + nazev_protiuctu = VALUES(nazev_protiuctu), + nazev_banky = VALUES(nazev_banky), + typ = VALUES(typ), + vs = VALUES(vs), + ks = VALUES(ks), + ss = VALUES(ss), + uziv_identifikace = VALUES(uziv_identifikace), + zprava_pro_prijemce = VALUES(zprava_pro_prijemce), + provedl = VALUES(provedl), + id_pokynu = VALUES(id_pokynu), + komentar = VALUES(komentar), + upr_objem_mena = VALUES(upr_objem_mena), + api_bic = VALUES(api_bic), + reference_platce = VALUES(reference_platce) + """ + + total_inserted = 0 + total_skipped_pk = 0 + total_skipped_error = 0 + + # ====================================================== + # PROCESS EACH ACCOUNT IN accounts.json + # ====================================================== + for acc in accounts: + name = acc["name"] + cfg_acc_num = acc["account_number"] + token = acc["token"] + + print(f"--- Účet: {name} ({cfg_acc_num}) ---", flush=True) + t0 = time.time() + + # 1) Download JSON from Fio API + data = fetch_fio_json(token, d_from, d_to) + if data is None: + print(" Přeskakuji, žádná data / chyba API.\n", flush=True) + continue + + # 2) Save raw JSON file to disk + try: + json_path = save_json_for_account(JSON_BASE_DIR, acc, data, d_from, d_to) + print(f" JSON uložen do: {json_path}", flush=True) + except Exception as e: + print(f" ❌ Chyba při ukládání JSON souboru: {e}", flush=True) + + # 3) Extract transactions from JSON tree + tlist = data.get("accountStatement", {}).get("transactionList", {}).get("transaction", []) + + if isinstance(tlist, dict): + tlist = [tlist] + + print(f" Počet transakcí v období: {len(tlist)}", flush=True) + + if not tlist: + print(" Žádné transakce, jdu dál.\n", flush=True) + continue + + fio_acc_id = data.get("accountStatement", {}).get("info", {}).get("accountId") + + if cfg_acc_num and fio_acc_id and cfg_acc_num.split("/")[0] not in fio_acc_id: + print( + f" ⚠ Upozornění: accountId z Fio ({fio_acc_id}) " + f"se neshoduje s account_number v konfiguraci ({cfg_acc_num})", + flush=True, + ) + + # 4) Build list of MySQL rows + rows = [] + skipped_pk_account = 0 + + for t in tlist: + # id_operace = Column22 (tvé "id pohybu") + id_operace_val = safe_col(t, 22) + + # Pokud chybí, vygenerujeme si stabilní fallback (hash) – vejde se do VARCHAR(20) + if id_operace_val is None: + fallback = generate_fallback_id(fio_acc_id or "", t) + id_operace_val = fallback + # Můžeš odkomentovat, pokud chceš vidět, kde se používá fallback + # print(f" ⚠ Fallback id_operace (hash) pro transakci: {fallback}", flush=True) + + # Bez PK nemá smysl zápis – jen pro jistotu, fallback by měl vše pokrýt + if id_operace_val is None: + skipped_pk_account += 1 + continue + + transaction_date = clean_date(safe_col(t, 0)) + if not transaction_date: + # Bez data by insert stejně spadl (NOT NULL), tak to raději přeskočíme + if DEBUG_ON_ERROR: + print(f" ⚠ Přeskakuji transakci bez data, id_operace={id_operace_val}", flush=True) + skipped_pk_account += 1 + continue + + id_pokynu_val = safe_col(t, 19) # tvé "id pokynu" = Column19 + + row = { + "id_operace": str(id_operace_val), + "cislo_uctu": fio_acc_id, + "transaction_date": transaction_date, + "amount": safe_col(t, 1), + "currency": safe_col(t, 14), + "typ": safe_col(t, 8), + "provedl": safe_col(t, 9), + + "protiucet": safe_col(t, 2), + "kod_banky": safe_col(t, 3), + "nazev_protiuctu": safe_col(t, 10), + "nazev_banky": safe_col(t, 12), + "api_bic": safe_col(t, 26), + + "vs": safe_col(t, 5), + "ks": safe_col(t, 4), + "ss": safe_col(t, 6), + + "zprava_pro_prijemce": safe_col(t, 16), + "uziv_identifikace": safe_col(t, 7), + "komentar": safe_col(t, 25), + "upr_objem_mena": safe_col(t, 18), + "id_pokynu": str(id_pokynu_val) if id_pokynu_val is not None else None, + "reference_platce": safe_col(t, 27), + } + rows.append(row) + + if skipped_pk_account: + print(f" ⚠ Přeskočeno {skipped_pk_account} transakcí kvůli chybějícímu/invalidnímu PK nebo datu.", flush=True) + + total_skipped_pk += skipped_pk_account + + # 5) INSERT rows into MySQL in batches + inserted = 0 + skipped_error_account = 0 + + for i in range(0, len(rows), BATCH_SIZE): + chunk = rows[i: i + BATCH_SIZE] + + try: + cur.executemany(sql, chunk) + conn.commit() + inserted += len(chunk) + + except Error as e: + print(f" ❌ Chyba při zápisu batch do DB: {e}", flush=True) + conn.rollback() + + if DEBUG_ON_ERROR: + print(" ► Přecházím na per-row insert pro detail chyb...", flush=True) + + for row in chunk: + try: + cur.execute(sql, row) + conn.commit() + inserted += 1 + except Error as e_row: + skipped_error_account += 1 + conn.rollback() + print( + f" ✗ Chybná transakce id_operace={row.get('id_operace')} " + f"datum={row.get('transaction_date')} částka={row.get('amount')} " + f"→ {e_row}", + flush=True, + ) + + elapsed = time.time() - t0 + total_inserted += inserted + total_skipped_error += skipped_error_account + + print( + f" ✓ Zapsáno (insert/update): {inserted} řádků do DB " + f"(přeskočeno chybějící PK/dat {skipped_pk_account}, chybou insertu {skipped_error_account}) " + f"za {elapsed:.2f} s\n", + flush=True, + ) + + # Close DB + cur.close() + conn.close() + + total_elapsed = time.time() - start_all + + print( + f"=== Hotovo. Celkem zapsáno {total_inserted} transakcí. " + f"Přeskočeno kvůli PK/datům: {total_skipped_pk}, kvůli chybě insertu: {total_skipped_error}. " + f"Celkový čas: {total_elapsed:.2f} s ===", + flush=True, + ) + + +# ====================================================== +# ENTRY POINT +# ====================================================== + +if __name__ == "__main__": + main() diff --git a/#10 Download reports/20 Report.py b/#10 Download reports/20 Report.py new file mode 100644 index 0000000..163b26e --- /dev/null +++ b/#10 Download reports/20 Report.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import sys +import io + +# Force UTF-8 output for Scheduled Tasks +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') +sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') + +""" +FIO EXPORT SCRIPT — FINÁLNÍ VERZE S KONTROLNÍM VÝPISEM +------------------------------------------- + +Skript nyní vypisuje POČET ŘÁDKŮ NAČTENÝCH Z DATABÁZE pro každý list. +Tím ověříme, zda se data ztrácí při čtení z DB, nebo až při exportu do Excelu. +""" + +import mysql.connector +from mysql.connector import Error +from openpyxl import Workbook +from openpyxl.styles import Font, PatternFill, Alignment, Border, Side +from datetime import datetime, date as dt_date +from decimal import Decimal +import os +import glob +import json +from typing import List, Dict, Any + +# ====================================================== +# CONFIGURATION +# ====================================================== + +# MySQL server parameters +DB_HOST = "192.168.1.76" +DB_PORT = 3307 +DB_USER = "root" +DB_PASS = "Vlado9674+" +DB_NAME = "fio" + +# Where to save Excel files +OUTPUT_DIR = r"Z:\Dropbox\!!!Days\Downloads Z230" + +# JSON file with list of accounts (name + account_number) +ACCOUNTS_JSON = r"C:\Users\vlado\PycharmProjects\FIO\accounts.json" + +# VÍCE SLOUPCŮ PŘENASTAVENO NA TEXT +TEXT_COLUMNS = [ + "cislo_uctu", + "protiucet", + "kod_banky", + "vs", + "ks", + "ss", + "id_operace", + "id_pokynu" +] + + +# ====================================================== +# REMOVE OLD EXPORT FILES (Beze změny) +# ====================================================== + +def cleanup_old_exports(): + """ + Deletes older versions of exported XLSX files. + """ + patterns = [ + os.path.join(OUTPUT_DIR, "*FIO*transaction*.xlsx"), + os.path.join(OUTPUT_DIR, "*FIO*transactions*.xlsx"), + os.path.join(OUTPUT_DIR, "*FIO_transactions*.xlsx"), + ] + + for pattern in patterns: + for file in glob.glob(pattern): + try: + os.remove(file) + print(f"🗑 Deleted old export: {file}") + except: + pass + + +# ====================================================== +# CORE EXCEL FORMATTING FUNCTION (Oprava konverze typů zachována) +# ====================================================== + +def format_sheet(ws, rows: List[Dict[str, Any]], headers: List[str]): + """ + Applies ALL formatting rules to a worksheet: + FIX: Explicitní konverze datových typů pro OpenPyXL zachována. + """ + + # ------------------------------- + # 1) Format HEADER row + # ------------------------------- + for col_idx in range(1, len(headers) + 1): + cell = ws.cell(row=1, column=col_idx) + cell.font = Font(bold=True) + cell.fill = PatternFill(start_color="FFFF00", fill_type="solid") + + # ------------------------------- + # 2) Write DATA rows (OPRAVA KONVERZE TYPŮ) + # ------------------------------- + # Klíčová konverze pro Decimal/Date objekty + for row in rows: + excel_row = [] + for h in headers: + val = row[h] + + # 🛠️ KLÍČOVÁ OPRAVA: Konverze MySQL/Decimal objektů na nativní Python typy + if isinstance(val, Decimal): + val = float(val) + elif isinstance(val, dt_date) and not isinstance(val, datetime): + val = val.strftime("%Y-%m-%d") + # ----------------------------------------------------------- + + # Pro text-sensitive sloupce (ID, symboly), zapisuj ="hodnota" + if h in TEXT_COLUMNS and val is not None: + val = str(val) + excel_row.append(f'="{val}"') + else: + excel_row.append(val) + + ws.append(excel_row) + + # ------------------------------- + # 3) Background coloring by "amount" + # ------------------------------- + fill_red = PatternFill(start_color="FFFFDDDD", end_color="FFFFDDDD", fill_type="solid") + fill_green = PatternFill(start_color="FFEEFFEE", end_color="FFEEFFEE", fill_type="solid") + + try: + amount_col_index = headers.index("amount") + 1 + except ValueError: + amount_col_index = -1 + + if amount_col_index != -1: + for row_idx in range(2, len(rows) + 2): + cell_amount = ws.cell(row=row_idx, column=amount_col_index) + + try: + value = float(str(cell_amount.value).strip('="')) + except: + value = 0 + + fill = fill_red if value < 0 else fill_green + for col_idx in range(1, len(headers) + 1): + ws.cell(row=row_idx, column=col_idx).fill = fill + + # ------------------------------- + # 4) Fixed column widths (22 sloupců) + # ------------------------------- + fixed_widths = [ + 13, 14, 11, 14, 8, 14, 11, 30, 30, 25, + 13, 13, 13, 35, 30, 15, 13, 30, 20, 13, + 30, 20 + ] + + if len(fixed_widths) < len(headers): + fixed_widths.extend([15] * (len(headers) - len(fixed_widths))) + + for i, width in enumerate(fixed_widths, start=1): + col_letter = chr(64 + i) + ws.column_dimensions[col_letter].width = width + + # ------------------------------- + # 5) Add borders + alignment + # ------------------------------- + thin = Side(border_style="thin", color="000000") + border = Border(left=thin, right=thin, top=thin, bottom=thin) + align_center = Alignment(horizontal="center") + + total_rows = len(rows) + 1 + total_cols = len(headers) + + ALIGN_CENTER_COLS = ["id_operace", "transaction_date", "currency", "kod_banky", "vs", "ks", "ss"] + center_indices = [headers.index(col) + 1 for col in ALIGN_CENTER_COLS if col in headers] + + for row_idx in range(1, total_rows + 1): + for col_idx in range(1, total_cols + 1): + cell = ws.cell(row=row_idx, column=col_idx) + cell.border = border + if col_idx in center_indices: + cell.alignment = align_center + + ws.freeze_panes = "A2" + ws.auto_filter.ref = ws.dimensions + + +# ====================================================== +# MAIN EXPORT PROCESS +# ====================================================== + +def export_fio(): + print("Connecting to MySQL...") + + # Connect to MySQL database + try: + conn = mysql.connector.connect( + host=DB_HOST, + port=DB_PORT, + user=DB_USER, + password=DB_PASS, + database=DB_NAME + ) + except Error as e: + print("❌ Failed to connect:", e) + return + + # Používáme dictionary=True pro získání dat jako slovník + cur = conn.cursor(dictionary=True) + + # ------------------------------- + # Load accounts.json + # ------------------------------- + with open(ACCOUNTS_JSON, "r", encoding="utf-8") as f: + accounts = json.load(f) + + # ------------------------------- + # Define priority first sheets + # ------------------------------- + preferred_order = [ + "CZK rodina", + "CZK ordinace", + "CZK na jídlo", + "CZK TrialHelp", + "CZK maminka svojě věci" + ] + + accounts_sorted = [] + + # Step 1: add priority accounts first + for pref in preferred_order: + for acc in accounts: + if acc["name"] == pref: + accounts_sorted.append(acc) + + # Step 2: add remaining accounts afterward + for acc in accounts: + if acc not in accounts_sorted: + accounts_sorted.append(acc) + + # ------------------------------- + # Create a new Excel workbook + # ------------------------------- + wb = Workbook() + wb.remove(wb.active) # remove default empty sheet + + # ------------------------------- + # FIRST SHEET: ALL TRANSACTIONS + # ------------------------------- + cur.execute("SELECT * FROM transactions ORDER BY transaction_date DESC") + all_rows = cur.fetchall() + + if all_rows: + headers = list(all_rows[0].keys()) + # Tisk počtu řádků pro "ALL" + print(f"➡ Sheet: ALL | Řádků z DB: {len(all_rows)}") + + ws_all = wb.create_sheet(title="ALL") + ws_all.append(headers) + format_sheet(ws_all, all_rows, headers) + + # ------------------------------- + # INDIVIDUAL SHEETS PER ACCOUNT + # ------------------------------- + for acc in accounts_sorted: + acc_num = acc["account_number"] + sheet_name = acc["name"][:31] # Excel sheet name limit + + print(f"➡ Creating sheet: {sheet_name}", end=' | ') # Tisk názvu listu + + query = f""" + SELECT * + FROM transactions + WHERE cislo_uctu = '{acc_num}' + ORDER BY transaction_date DESC + """ + + cur.execute(query) + rows = cur.fetchall() + + # VÝPIS POČTU ZÁZNAMŮ Z DB + print(f"Řádků z DB: {len(rows)}") + + if not rows: + print(f"⚠ No data for {sheet_name}") + continue + + headers = list(rows[0].keys()) + ws = wb.create_sheet(title=sheet_name) + ws.append(headers) + + format_sheet(ws, rows, headers) + + conn.close() + + # ------------------------------- + # Save Excel file + # ------------------------------- + + cleanup_old_exports() + + # File name includes timestamp + timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S") + filename = f"{timestamp} FIO transactions.xlsx" + output_file = os.path.join(OUTPUT_DIR, filename) + + wb.save(output_file) + + print(f"✅ Export complete:\n{output_file}") + + +# ====================================================== +# MAIN ENTRY POINT +# ====================================================== + +if __name__ == "__main__": + export_fio() \ No newline at end of file diff --git a/#10 Download reports/30 Report ordinace.py b/#10 Download reports/30 Report ordinace.py new file mode 100644 index 0000000..6834204 --- /dev/null +++ b/#10 Download reports/30 Report ordinace.py @@ -0,0 +1,268 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import sys +import io + +# UTF-8 console for Scheduled Tasks +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') +sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') + +import mysql.connector +from mysql.connector import Error +from openpyxl import Workbook +from openpyxl.styles import Font, PatternFill, Alignment, Border, Side +from datetime import datetime, date as dt_date +from decimal import Decimal +from pathlib import Path +# ============================== +# DELETE OLD REPORTS (OPTION C) +# ============================== + +def delete_all_old_reports(directory: Path): + """Deletes all previously generated ordinace expense reports.""" + pattern = "*fio ordinace transactions.xlsx" + deleted = 0 + + for f in directory.glob(pattern): + try: + f.unlink() + deleted += 1 + print(f"🗑 Deleted old report: {f.name}") + except Exception as e: + print(f"❌ Could not delete {f.name}: {e}") + + if deleted == 0: + print("ℹ No old reports to delete.") + else: + print(f"✓ Deleted {deleted} old reports.") + + +# ====================================================== +# CONFIG +# ====================================================== + +DB = { + "host": "192.168.1.76", + "port": 3307, + "user": "root", + "password": "Vlado9674+", + "database": "fio", +} + +ORDINACE_ACCOUNT = "2800046620" + +REPORTOVAT = { + "VZP": "1114007221", + "VOZP": "2010009091", + "ČPZP": "2054108761", + "OZP": "2070101041", + "ZPŠ": "2090309181", + "ZPMV": "2112108031", +} + +OUTPUT_DIR = Path(r"z:\Dropbox\Ordinace\Reporty") + +TEXT_COLUMNS = [ + "cislo_uctu", "protiucet", "kod_banky", + "vs", "ks", "ss", + "id_operace", "id_pokynu" +] + + +# ====================================================== +# FORMAT APPLYING (copied from main report) +# ====================================================== + +def format_sheet(ws, rows, headers): + + # ---------------------- HEADER ----------------------- + for col_idx in range(1, len(headers) + 1): + cell = ws.cell(row=1, column=col_idx) + cell.font = Font(bold=True) + cell.fill = PatternFill(start_color="FFFF00", fill_type="solid") + + # ---------------------- DATA ROWS -------------------- + for row in rows: + excel_row = [] + for h in headers: + val = row[h] + + # Convert MySQL data types + if isinstance(val, Decimal): + val = float(val) + elif isinstance(val, dt_date): + val = val.strftime("%Y-%m-%d") + + # For certain columns, force ="text" + if h in TEXT_COLUMNS and val is not None: + excel_row.append(f'="{val}"') + else: + excel_row.append(val) + + ws.append(excel_row) + + # ---------------------- COLORING --------------------- + fill_red = PatternFill(start_color="FFFFDDDD", fill_type="solid") + fill_green = PatternFill(start_color="FFEEFFEE", fill_type="solid") + + try: + amount_col = headers.index("amount") + 1 + except ValueError: + amount_col = -1 + + if amount_col != -1: + for r in range(2, len(rows) + 2): + cell = ws.cell(row=r, column=amount_col) + try: + value = float(str(cell.value).strip('="')) + except: + value = 0 + + fill = fill_red if value < 0 else fill_green + for c in range(1, len(headers) + 1): + ws.cell(row=r, column=c).fill = fill + + # ---------------------- COLUMN WIDTHS ----------------- + fixed_widths = [ + 13, 14, 11, 14, 8, 14, 11, 30, 30, 25, + 13, 13, 13, 35, 30, 15, 13, 30, 20, 13, + 30, 20 + ] + if len(fixed_widths) < len(headers): + fixed_widths.extend([15] * (len(headers) - len(fixed_widths))) + + for i, width in enumerate(fixed_widths, start=1): + letter = chr(64 + i) + ws.column_dimensions[letter].width = width + + # ---------------------- BORDERS & ALIGNMENT ---------- + thin = Side(border_style="thin", color="000000") + border = Border(left=thin, right=thin, top=thin, bottom=thin) + align_center = Alignment(horizontal="center") + + center_cols = ["id_operace", "transaction_date", "currency", "kod_banky", "vs", "ks", "ss"] + center_indices = [headers.index(c) + 1 for c in center_cols if c in headers] + + total_rows = len(rows) + 1 + total_cols = len(headers) + + for r in range(1, total_rows + 1): + for c in range(1, total_cols + 1): + cell = ws.cell(row=r, column=c) + cell.border = border + if c in center_indices: + cell.alignment = align_center + + ws.freeze_panes = "A2" + ws.auto_filter.ref = ws.dimensions + + +# ====================================================== +# EXPORT +# ====================================================== + +def export_ordinace(): + print("Connecting MySQL...") + conn = mysql.connector.connect(**DB) + cur = conn.cursor(dictionary=True) + + # ============================ + # Load ALL transactions for ordinace + # ============================ + sql_all = f""" + SELECT * + FROM transactions + WHERE cislo_uctu = '{ORDINACE_ACCOUNT}' + ORDER BY transaction_date DESC; + """ + cur.execute(sql_all) + all_rows = cur.fetchall() + + if not all_rows: + print("❌ No transactions found for ordinace account.") + return + + headers = list(all_rows[0].keys()) + + # Workbook + wb = Workbook() + wb.remove(wb.active) + + # --------------------- ALL sheet --------------------- + ws_all = wb.create_sheet("ALL ordinace") + ws_all.append(headers) + format_sheet(ws_all, all_rows, headers) + + print(f"➡ ALL ordinace rows: {len(all_rows)}") + + # --------------------- INSURANCE sheets --------------- + summary = [] + + for name, acc in REPORTOVAT.items(): + print(f"➡ Pojišťovna {name} ({acc})") + + sql = f""" + SELECT * + FROM transactions + WHERE cislo_uctu = '{ORDINACE_ACCOUNT}' + AND ( + protiucet <> '2070101041' + OR (protiucet = '2070101041' AND amount > 0) + ) + AND protiucet = '{acc}' + ORDER BY transaction_date DESC; + """ + + cur.execute(sql) + rows = cur.fetchall() + + count = len(rows) + summa = sum(float(r["amount"]) for r in rows) if rows else 0 + + summary.append({ + "Pojišťovna": name, + "Účet": acc, + "Počet transakcí": count, + "Součet": summa + }) + + if not rows: + print(f" ⚠ No rows") + continue + + ws = wb.create_sheet(name) + ws.append(headers) + format_sheet(ws, rows, headers) + + print(f" ✓ {count} rows, sum {summa:.2f} Kč") + + # --------------------- SUMMARY sheet ----------------- + ws_s = wb.create_sheet("Přehled") + ws_s.append(["Pojišťovna", "Účet", "Počet transakcí", "Součet Kč"]) + + for row in summary: + ws_s.append([ + row["Pojišťovna"], + row["Účet"], + row["Počet transakcí"], + f"{row['Součet']:.2f}" + ]) + + # =========================== + # Save Excel + # =========================== + timestamp = datetime.now().strftime("%Y-%m-%d %H-%M-%S") + out_file = OUTPUT_DIR / f"{timestamp} FIO ordinace transactions.xlsx" + + wb.save(out_file) + print(f"\n✅ Export hotový:\n{out_file}") + + +# ====================================================== +# MAIN +# ====================================================== + +if __name__ == "__main__": + delete_all_old_reports(OUTPUT_DIR) + export_ordinace() diff --git a/#10 Download reports/31 Report ordinace expenses.py b/#10 Download reports/31 Report ordinace expenses.py new file mode 100644 index 0000000..c2e2541 --- /dev/null +++ b/#10 Download reports/31 Report ordinace expenses.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Expenses Ordinace – Report Generator (2025) +------------------------------------------ +Reads JSON with tab definitions + SQL queries. +Creates one Excel workbook with multiple sheets. +Uniform formatting for all tabs. +Deletes old reports before saving the new one. +""" + +import json +import pandas as pd +import pymysql +from datetime import datetime +from pathlib import Path +from openpyxl import Workbook +from openpyxl.styles import Font, Alignment, PatternFill, Border, Side +from openpyxl.utils.dataframe import dataframe_to_rows + + +# ============================== +# CONFIG +# ============================== + +JSON_TABS = r"expenses_tabs.json" + +MYSQL = { + "host": "192.168.1.76", + "port": 3307, + "user": "root", + "password": "Vlado9674+", + "database": "fio", + "charset": "utf8mb4" +} + +EXPORT_DIR = Path(r"z:\Dropbox\Ordinace\Reporty") +EXPORT_DIR.mkdir(exist_ok=True, parents=True) + + +# ============================== +# DELETE OLD REPORTS (OPTION C) +# ============================== + +def delete_all_old_reports(directory: Path): + """Deletes all previously generated ordinace expense reports.""" + pattern = "*fio ordinace expenses.xlsx" + deleted = 0 + + for f in directory.glob(pattern): + try: + f.unlink() + deleted += 1 + print(f"🗑 Deleted old report: {f.name}") + except Exception as e: + print(f"❌ Could not delete {f.name}: {e}") + + if deleted == 0: + print("ℹ No old reports to delete.") + else: + print(f"✓ Deleted {deleted} old reports.") + + +# ============================== +# FORMATTING HELPERS +# ============================== + +def format_sheet(ws): + """Apply column widths, header styling, borders, autofilter.""" + + # Yellow header (Option A) + header_fill = PatternFill("solid", fgColor="FFF200") + bold_font = Font(bold=True, color="000000") + center_align = Alignment(horizontal="center", vertical="center") + + thin = Side(border_style="thin", color="000000") + border = Border(left=thin, right=thin, top=thin, bottom=thin) + + # Autofilter + if ws.max_row > 1: + ws.auto_filter.ref = ws.dimensions + + # Auto column widths + for col in ws.columns: + max_len = 0 + letter = col[0].column_letter + for cell in col: + try: + max_len = max(max_len, len(str(cell.value))) + except Exception: + pass + ws.column_dimensions[letter].width = min(max_len + 2, 50) + + # Style header row + for cell in ws[1]: + cell.font = bold_font + cell.fill = header_fill + cell.alignment = center_align + cell.border = border + + # Border for all body cells + for row in ws.iter_rows(min_row=2): + for cell in row: + cell.border = border + + +# ============================== +# MAIN +# ============================== + +def main(): + print("=== Expenses Ordinace Report (with cleanup) ===") + + # Load JSON tabs + with open(JSON_TABS, "r", encoding="utf-8") as f: + config = json.load(f) + + tabs = config.get("tabs", []) + print(f"Loaded {len(tabs)} tab definitions.") + + # Connect DB + conn = pymysql.connect(**MYSQL) + + # Prepare workbook + wb = Workbook() + wb.remove(wb.active) + + # Process each tab + for tab in tabs: + name = tab["name"] + sql = tab["sql"] + + print(f"→ Running tab: {name}") + + df = pd.read_sql(sql, conn) + df = df.fillna("") + + # Swap columns N (index 13) and O (index 14) + cols = df.columns.tolist() + if len(cols) >= 15: + cols[13], cols[14] = cols[14], cols[13] + df = df[cols] + + # Create sheet + sheet_name = name[:31] + ws = wb.create_sheet(sheet_name) + + # Write DataFrame + for row in dataframe_to_rows(df, index=False, header=True): + ws.append(row) + + # Apply formatting + format_sheet(ws) + + conn.close() + + # Delete older reports + delete_all_old_reports(EXPORT_DIR) + + # Save new report + OUTFILE = EXPORT_DIR / f"{datetime.now():%Y-%m-%d %H-%M-%S} FIO ordinace expenses.xlsx" + wb.save(OUTFILE) + + print(f"\n✔ Report generated:\n{OUTFILE}") + + +if __name__ == "__main__": + main() diff --git a/#10 Download reports/40 import_fio_history.py b/#10 Download reports/40 import_fio_history.py new file mode 100644 index 0000000..010b3e6 --- /dev/null +++ b/#10 Download reports/40 import_fio_history.py @@ -0,0 +1,262 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import sys +import io +import time +import json +from pathlib import Path +import hashlib +import mysql.connector +from mysql.connector import Error + +# ==================================================================== +# UTF-8 OUTPUT +# ==================================================================== +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') +sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') + +# ==================================================================== +# CONFIGURATION +# ==================================================================== + +BASE_DIR = Path(r"z:\Dropbox\!!!Days\Downloads Z230\Fio") + +DB = { + "host": "192.168.1.76", + "port": 3307, + "user": "root", + "password": "Vlado9674+", + "database": "fio", + "charset": "utf8mb4", +} + +BATCH_SIZE = 500 +DEBUG_ON_ERROR = True + + +# ==================================================================== +# HELPERS +# ==================================================================== + +def safe_col(t: dict, n: int): + key = f"column{n}" + val = t.get(key) + return val.get("value") if val else None + + +def clean_date(dt_str: str): + if not dt_str: + return None + return dt_str[:10] + + +def generate_fallback_id(account_id: str, t: dict) -> str: + raw_date = clean_date(safe_col(t, 0)) or "" + amount = str(safe_col(t, 1) or "") + protiucet = str(safe_col(t, 2) or "") + vs = str(safe_col(t, 5) or "") + src = f"{account_id}|{raw_date}|{amount}|{protiucet}|{vs}" + digest = hashlib.sha1(src.encode("utf-8")).hexdigest() + return digest[:20] + + +def load_json_file(p: Path): + try: + with open(p, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + print(f" ❌ Nelze načíst JSON: {p} → {e}") + return None + + +# ==================================================================== +# MAIN +# ==================================================================== + +def main(): + start_all = time.time() + + print("=== Fio HISTORICKÝ IMPORT (ze všech JSON na disku) ===\n", flush=True) + print(f"Hledám JSON soubory v: {BASE_DIR}", flush=True) + + # Najdeme všechny JSONy ve všech podadresářích + all_json_paths = list(BASE_DIR.rglob("*.json")) + print(f"Nalezeno JSON souborů: {len(all_json_paths)}\n", flush=True) + + if not all_json_paths: + print("Nenalezeny žádné JSON soubory. Konec.") + return + + # DB připojení + try: + conn = mysql.connector.connect( + host=DB["host"], + port=DB["port"], + user=DB["user"], + password=DB["password"], + database=DB["database"], + charset=DB["charset"] + ) + cur = conn.cursor() + except Error as e: + print(f"FATAL DB ERROR: {e}") + return + + sql = """ + INSERT INTO transactions + ( + cislo_uctu, id_operace, transaction_date, amount, currency, + protiucet, kod_banky, nazev_protiuctu, nazev_banky, typ, + vs, ks, ss, uziv_identifikace, zprava_pro_prijemce, + provedl, id_pokynu, komentar, upr_objem_mena, api_bic, reference_platce + ) + VALUES + ( + %(cislo_uctu)s, %(id_operace)s, %(transaction_date)s, %(amount)s, %(currency)s, + %(protiucet)s, %(kod_banky)s, %(nazev_protiuctu)s, %(nazev_banky)s, %(typ)s, + %(vs)s, %(ks)s, %(ss)s, %(uziv_identifikace)s, %(zprava_pro_prijemce)s, + %(provedl)s, %(id_pokynu)s, %(komentar)s, %(upr_objem_mena)s, %(api_bic)s, %(reference_platce)s + ) + ON DUPLICATE KEY UPDATE + transaction_date = VALUES(transaction_date), + amount = VALUES(amount), + currency = VALUES(currency), + protiucet = VALUES(protiucet), + kod_banky = VALUES(kod_banky), + nazev_protiuctu = VALUES(nazev_protiuctu), + nazev_banky = VALUES(nazev_banky), + typ = VALUES(typ), + vs = VALUES(vs), + ks = VALUES(ks), + ss = VALUES(ss), + uziv_identifikace = VALUES(uziv_identifikace), + zprava_pro_prijemce = VALUES(zprava_pro_prijemce), + provedl = VALUES(provedl), + id_pokynu = VALUES(id_pokynu), + komentar = VALUES(komentar), + upr_objem_mena = VALUES(upr_objem_mena), + api_bic = VALUES(api_bic), + reference_platce = VALUES(reference_platce) + """ + + total_processed_files = 0 + total_rows_inserted = 0 + total_rows_skipped = 0 + + # ============================================ + # PROCES JSON SOUBOR PO SOUBORU + # ============================================ + for p in all_json_paths: + total_processed_files += 1 + print(f"--- Soubor {total_processed_files}/{len(all_json_paths)}: {p}", flush=True) + + data = load_json_file(p) + if not data: + continue + + # JSON má strukturu jako při fetchnutí z API + account_info = data.get("accountStatement", {}).get("info", {}) + account_id = account_info.get("accountId") + + if not account_id: + print(" ⚠ Nelze zjistit cislo_uctu z JSON! Přeskakuji.") + continue + + tlist = data.get("accountStatement", {}).get("transactionList", {}).get("transaction", []) + if isinstance(tlist, dict): + tlist = [tlist] + + print(f" Počet transakcí: {len(tlist)}", flush=True) + + if not tlist: + continue + + rows = [] + skipped_local = 0 + + # Převést transakce → DB řádky + for t in tlist: + id_operace_val = safe_col(t, 22) + if id_operace_val is None: + id_operace_val = generate_fallback_id(account_id, t) + + transaction_date = clean_date(safe_col(t, 0)) + if not transaction_date: + skipped_local += 1 + continue + + id_pokynu_val = safe_col(t, 19) + + row = { + "cislo_uctu": account_id, + "id_operace": str(id_operace_val), + "transaction_date": transaction_date, + "amount": safe_col(t, 1), + "currency": safe_col(t, 14), + "protiucet": safe_col(t, 2), + "kod_banky": safe_col(t, 3), + "nazev_protiuctu": safe_col(t, 10), + "nazev_banky": safe_col(t, 12), + "api_bic": safe_col(t, 26), + "typ": safe_col(t, 8), + "provedl": safe_col(t, 9), + "vs": safe_col(t, 5), + "ks": safe_col(t, 4), + "ss": safe_col(t, 6), + "zprava_pro_prijemce": safe_col(t, 16), + "uziv_identifikace": safe_col(t, 7), + "komentar": safe_col(t, 25), + "upr_objem_mena": safe_col(t, 18), + "id_pokynu": str(id_pokynu_val) if id_pokynu_val else None, + "reference_platce": safe_col(t, 27), + } + rows.append(row) + + total_rows_skipped += skipped_local + print(f" Přeskočeno transakcí bez data/PK: {skipped_local}") + + # Batch insert + inserted = 0 + + for i in range(0, len(rows), BATCH_SIZE): + chunk = rows[i: i + BATCH_SIZE] + try: + cur.executemany(sql, chunk) + conn.commit() + inserted += len(chunk) + except Error as e: + print(f" ❌ Batch insert error: {e}") + conn.rollback() + + if DEBUG_ON_ERROR: + print(" ► Per-row insert for debugging…") + for row in chunk: + try: + cur.execute(sql, row) + conn.commit() + inserted += 1 + except Error as e_row: + conn.rollback() + print(f" ✗ Chyba transakce id_operace={row['id_operace']} → {e_row}") + + total_rows_inserted += inserted + print(f" ✓ Zapsáno/aktualizováno: {inserted}") + + # ====================== + # ZÁVĚR + # ====================== + cur.close() + conn.close() + + elapsed = time.time() - start_all + print("\n===== HOTOVO =====", flush=True) + print(f"Souborů zpracováno: {total_processed_files}") + print(f"Transakcí zapsáno/aktualizováno: {total_rows_inserted}") + print(f"Transakcí přeskočeno: {total_rows_skipped}") + print(f"Celkový čas: {elapsed:.2f} s") + print("==================", flush=True) + + +if __name__ == "__main__": + main() diff --git a/#10 Download reports/Trash/10 Read s auditem.py b/#10 Download reports/Trash/10 Read s auditem.py new file mode 100644 index 0000000..84213e2 --- /dev/null +++ b/#10 Download reports/Trash/10 Read s auditem.py @@ -0,0 +1,354 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import sys +import io +import time +from datetime import date, timedelta +from pathlib import Path +import json +import requests +import mysql.connector +from mysql.connector import Error +from typing import Dict, Any, List + +# ==================================================================== +# A. PONECHÁNO: Vynucení UTF-8 pro správnou diakritiku v plánovaných úlohách +# ==================================================================== +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') +sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') + +""" +FIO MULTI–ACCOUNT IMPORTER — KONEČNÁ VERZE S AUDITEM +============================================================ +Přidána zpětná kontrola (audit) po každé dávce, která ověřuje, +že se ID transakcí skutečně zapsaly do databáze. +""" + +# ========================================= +# CONFIGURATION +# ========================================= + +# JSON file containing multiple account configs: +# [ +#    { "name": "CZK rodina", "account_number": "2100046291", "token": "xxx" }, +#    ... +# ] +ACCOUNTS_FILE = r"/accounts.json" + +# Directory where raw JSON files from Fio API will be stored. +JSON_BASE_DIR = r"z:\Dropbox\!!!Days\Downloads Z230\Fio" + +# MySQL connection parameters +DB = { + "host": "192.168.1.76", + "port": 3307, + "user": "root", + "password": "Vlado9674+", + "database": "fio", + "charset": "utf8mb4", +} + +# How many transactions insert per batch (performance tuning) +BATCH_SIZE = 500 + +# How many days back we load from Fio (default = last 90 days) +DAYS_BACK = 90 + + +# ========================================= +# HELPERS (Beze změny) +# ========================================= + +def load_accounts(path: str) -> List[Dict[str, str]]: + """Reads accounts.json and validates content.""" + with open(path, "r", encoding="utf-8") as f: + accounts = json.load(f) + + for acc in accounts: + for key in ("name", "account_number", "token"): + if key not in acc: + raise ValueError(f"Missing '{key}' in account config: {acc}") + return accounts + + +def fio_url_for_period(token: str, d_from: date, d_to: date) -> str: + """Constructs the URL for Fio REST API periods endpoint.""" + from_str = d_from.strftime("%Y-%m-%d") + to_str = d_to.strftime("%Y-%m-%d") + return f"https://fioapi.fio.cz/v1/rest/periods/{token}/{from_str}/{to_str}/transactions.json" + + +def fetch_fio_json(token: str, d_from: date, d_to: date) -> Any: + """Calls Fio API and fetches JSON.""" + url = fio_url_for_period(token, d_from, d_to) + resp = requests.get(url, timeout=30) + + if resp.status_code != 200: + print(f"  ❌ HTTP {resp.status_code} from Fio: {url}", flush=True) + return None + + try: + return resp.json() + except json.JSONDecodeError: + print("  ❌ Cannot decode JSON from Fio response", flush=True) + return None + + +def safe_col(t: dict, n: int) -> Any: + """SAFE ACCESSOR for Fio transaction column numbers (ColumnN).""" + key = f"column{n}" + val = t.get(key) + if not val: + return None + return val.get("value") + + +def clean_date(dt_str: str) -> str: + """Strips timezone from Fio date string ("YYYY-MM-DD+HH:MM") → "YYYY-MM-DD".""" + if not dt_str: + return None + return str(dt_str)[:10] + + +def ensure_dir(path: Path): + """Creates directory if it doesn’t exist.""" + path.mkdir(parents=True, exist_ok=True) + + +def save_json_for_account(base_dir: str, account_cfg: dict, data: dict, d_from: date, d_to: date) -> Path: + """Saves raw JSON to disk.""" + acc_num_raw = account_cfg["account_number"] + acc_folder_name = acc_num_raw.replace("/", "_") + + out_dir = Path(base_dir) / acc_folder_name + ensure_dir(out_dir) + + filename = f"{d_from.strftime('%Y-%m-%d')}_to_{d_to.strftime('%Y-%m-%d')}.json" + out_path = out_dir / filename + + with open(out_path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + return out_path + + +# ========================================= +# MAIN IMPORT LOGIC +# ========================================= + +def main(): + start_all = time.time() + + # Calculate time range (last N days) + today = date.today() + d_from = today - timedelta(days=DAYS_BACK) + d_to = today + + print("=== Fio multi-account import v2 (NOVÝ KONEKTOR + AUDIT) ===", flush=True) + print(f"Období: {d_from} až {d_to}", flush=True) + + # Load all accounts from accounts.json + try: + accounts = load_accounts(ACCOUNTS_FILE) + except Exception as e: + print(f"FATÁLNÍ CHYBA při načítání účtů: {e}", flush=True) + return + + print(f"  Účtů v konfiguraci: {len(accounts)}\n", flush=True) + + # ZMĚNA: Připojení pomocí mysql.connector + try: + conn = mysql.connector.connect( + host=DB["host"], + port=DB["port"], + user=DB["user"], + password=DB["password"], + database=DB["database"], + charset=DB["charset"] + ) + cur = conn.cursor() + except Error as e: + print(f"FATÁLNÍ CHYBA při připojení k DB: {e}", flush=True) + return + + # SQL INSERT dotaz přizpůsobený nové DB struktuře + sql = """ + INSERT INTO transactions + (id_operace, cislo_uctu, transaction_date, amount, currency, \ + protiucet, kod_banky, nazev_protiuctu, nazev_banky, typ, \ + vs, ks, ss, uziv_identifikace, zprava_pro_prijemce, \ + provedl, id_pokynu, komentar, upr_objem_mena, api_bic, reference_platce) + VALUES (%(id_operace)s, %(cislo_uctu)s, %(transaction_date)s, %(amount)s, %(currency)s, + %(protiucet)s, %(kod_banky)s, %(nazev_protiuctu)s, %(nazev_banky)s, %(typ)s, + %(vs)s, %(ks)s, %(ss)s, %(uziv_identifikace)s, %(zprava_pro_prijemce)s, + %(provedl)s, %(id_pokynu)s, %(komentar)s, %(upr_objem_mena)s, %(api_bic)s, \ + %(reference_platce)s) ON DUPLICATE KEY \ + UPDATE \ + cislo_uctu = \ + VALUES (cislo_uctu), transaction_date = \ + VALUES (transaction_date), amount = \ + VALUES (amount), currency = \ + VALUES (currency), protiucet = \ + VALUES (protiucet), kod_banky = \ + VALUES (kod_banky), nazev_protiuctu = \ + VALUES (nazev_protiuctu), nazev_banky = \ + VALUES (nazev_banky), typ = \ + VALUES (typ), vs = \ + VALUES (vs), ks = \ + VALUES (ks), ss = \ + VALUES (ss), uziv_identifikace = \ + VALUES (uziv_identifikace), zprava_pro_prijemce = \ + VALUES (zprava_pro_prijemce), provedl = \ + VALUES (provedl), id_pokynu = \ + VALUES (id_pokynu), komentar = \ + VALUES (komentar), upr_objem_mena = \ + VALUES (upr_objem_mena), api_bic = \ + VALUES (api_bic), reference_platce = \ + VALUES (reference_platce) \ + """ + + total_inserted = 0 + + # ====================================================== + # PROCESS EACH ACCOUNT IN accounts.json + # ====================================================== + for acc in accounts: + name = acc["name"] + cfg_acc_num = acc["account_number"] + token = acc["token"] + + print(f"--- Účet: {name} ({cfg_acc_num}) ---", flush=True) + t0 = time.time() + + # --- 1) Download JSON from Fio API + data = fetch_fio_json(token, d_from, d_to) + if data is None: + print("  Přeskakuji, žádná data / chyba API.\n", flush=True) + continue + + # --- 2) Save raw JSON file to disk + try: + json_path = save_json_for_account(JSON_BASE_DIR, acc, data, d_from, d_to) + print(f"  JSON uložen do: {json_path}", flush=True) + except Exception as e: + print(f"  ❌ Chyba při ukládání JSON souboru: {e}", flush=True) + pass + + # --- 3) Extract transactions from JSON tree + tlist = data.get("accountStatement", {}).get("transactionList", {}).get("transaction", []) + + if isinstance(tlist, dict): + tlist = [tlist] + + print(f"  Počet transakcí v období: {len(tlist)}", flush=True) + + if not tlist: + print("  Žádné transakce, jdu dál.\n", flush=True) + continue + + fio_acc_id = data.get("accountStatement", {}).get("info", {}).get("accountId") + + if cfg_acc_num and fio_acc_id and cfg_acc_num.split("/")[0] not in fio_acc_id: + print(f"  ⚠ Upozornění: accountId z Fio ({fio_acc_id}) " + f"se neshoduje s account_number v konfiguraci ({cfg_acc_num})", flush=True) + + # --- 4) Build list of MySQL rows (MAPOVÁNÍ S KONVERZÍ NA STR) + rows = [] + for t in tlist: + id_operace_val = safe_col(t, 22) + id_pokynu_val = safe_col(t, 17) + + row = { + "id_operace": str(id_operace_val) if id_operace_val is not None else None, + "cislo_uctu": fio_acc_id, + "transaction_date": clean_date(safe_col(t, 0)), + "amount": safe_col(t, 1), + "currency": safe_col(t, 14), + "typ": safe_col(t, 8), + "provedl": safe_col(t, 9), + + "protiucet": safe_col(t, 2), + "kod_banky": safe_col(t, 3), + "nazev_protiuctu": safe_col(t, 10), + "nazev_banky": safe_col(t, 12), + "api_bic": safe_col(t, 26), + + "vs": safe_col(t, 5), + "ks": safe_col(t, 4), + "ss": safe_col(t, 6), + + "zprava_pro_prijemce": safe_col(t, 16), + "uziv_identifikace": safe_col(t, 7), + "komentar": safe_col(t, 25), + "upr_objem_mena": safe_col(t, 18), + "id_pokynu": str(id_pokynu_val) if id_pokynu_val is not None else None, + "reference_platce": safe_col(t, 27), + } + rows.append(row) + + # --- 5) INSERT rows into MySQL in batches S AUDITEM + inserted = 0 + actual_inserted_count = 0 # Počet řádků potvrzených auditem + + for i in range(0, len(rows), BATCH_SIZE): + chunk = rows[i: i + BATCH_SIZE] + + # Získej ID transakcí z aktuální dávky pro audit + chunk_ids = [row["id_operace"] for row in chunk if row["id_operace"] is not None] + + try: + # Krok 5.1: Provedení zápisu + cur.executemany(sql, chunk) + conn.commit() + + # --- Krok 5.2: AUDIT: Zpětná kontrola --- + if chunk_ids: + # Vytvoření stringu ID pro SQL dotaz: ('id1', 'id2', ...) + id_string = ', '.join([f"'{i}'" for i in chunk_ids]) + audit_query = f"SELECT COUNT(*) FROM transactions WHERE id_operace IN ({id_string})" + + cur.execute(audit_query) + # ZMĚNA: Používáme fetchone()[0] pro mysql.connector + found_count = cur.fetchone()[0] + + if found_count != len(chunk): + print( + f"  ⚠ AUDIT SELHAL: Zapsáno {len(chunk)}, ale v databázi nalezeno jen {found_count} pro tuto dávku!", + flush=True) + # Pokud audit selhal, tiskneme, která ID chybí (pro debug) + # To je složitější, ale alespoň víme, kolik jich chybí. + + actual_inserted_count += found_count + else: + # Pokud dávka neobsahuje ID (což by neměla, ale pro jistotu) + actual_inserted_count += len(chunk) + + inserted += len(chunk) + + except Error as e: + print(f"  ❌ Chyba při zápisu do DB: {e}", flush=True) + conn.rollback() + break # Přerušíme cyklus, pokud nastane kritická chyba + + elapsed = time.time() - t0 + total_inserted += actual_inserted_count # Sčítáme počet potvrzený auditem + + print(f"  ✓ Zapsáno (potvrzeno auditem): {actual_inserted_count} řádků do DB za {elapsed:.2f} s\n", flush=True) + # Nyní by měl tento počet odpovídat počtu transakcí pro daný účet, pokud nebyly žádné duplicity. + + # Close DB + cur.close() + conn.close() + + total_elapsed = time.time() - start_all + + print(f"=== Hotovo. Celkem zapsáno (potvrzeno auditem) {total_inserted} transakcí. " + f"Celkový čas: {total_elapsed:.2f} s ===", flush=True) + + +# ====================================================== +# ENTRY POINT +# ====================================================== + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/#10 Download reports/Trash/30 test.py b/#10 Download reports/Trash/30 test.py new file mode 100644 index 0000000..9cedbb2 --- /dev/null +++ b/#10 Download reports/Trash/30 test.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import sys +import io +import time +import json +# ZMĚNA: Importujeme oficiální konektor +import mysql.connector +from mysql.connector import Error +from typing import Dict, Any, List +from pathlib import Path +from datetime import date + +# ==================================================================== +# A. PONECHÁNO: Vynucení UTF-8 pro správnou diakritiku +# ==================================================================== +sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') +sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') + +# ========================================= +# KONFIGURACE PRO TEST +# ========================================= + +TEST_JSON_PATH = r"z:\Dropbox\!!!Days\Downloads Z230\Fio\2100074583\2025-09-06_to_2025-12-05.json" +TEST_ACCOUNT_ID = "2100074583" + +# MySQL connection parameters +DB = { + "host": "192.168.1.76", + "port": 3307, + "user": "root", + "password": "Vlado9674+", + "database": "fio", + "charset": "utf8", # mysql-connector používá 'utf8' nebo 'utf8mb4' +} + + +# ========================================= +# POMOCNÉ FUNKCE (Z vašeho původního kódu) +# ========================================= + +def safe_col(t: dict, n: int) -> Any: + """SAFE ACCESSOR for Fio transaction column numbers (ColumnN).""" + key = f"column{n}" + val = t.get(key) + if not val: + return None + return val.get("value") + + +def clean_date(dt_str: str) -> str: + """Strips timezone from Fio date string ("YYYY-MM-DD+HH:MM") → "YYYY-MM-DD".""" + if not dt_str: + return None + return str(dt_str)[:10] + + +def load_test_data(path: str) -> Any: + """Načte data přímo ze souboru JSON.""" + try: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + print(f"FATÁLNÍ CHYBA: Nelze načíst JSON soubor z {path}: {e}", flush=True) + return None + + +# ========================================= +# HLAVNÍ TESTOVACÍ LOGIKA +# ========================================= + +def run_test(): + start_all = time.time() + + print("=== TEST VKLÁDÁNÍ DAT Z JSON SOUBORU (OPRAVA S MYSQL-CONNECTOR) ===", flush=True) + print(f"Zdroj dat: {TEST_JSON_PATH}", flush=True) + + # KROK 1: Načtení dat přímo z JSON souboru + data = load_test_data(TEST_JSON_PATH) + if data is None: + return + + # KROK 2: Extrakce transakcí + tlist = data.get("accountStatement", {}).get("transactionList", {}).get("transaction", []) + if isinstance(tlist, dict): + tlist = [tlist] + + fio_acc_id = data.get("accountStatement", {}).get("info", {}).get("accountId") + expected_count = len(tlist) + + print(f"Očekávaný počet transakcí v JSON: {expected_count}", flush=True) + + if expected_count == 0: + print("Test přeskočen, JSON je prázdný.", flush=True) + return + + # KROK 3: Připojení k DB + try: + # ZMĚNA: Používáme mysql.connector + conn = mysql.connector.connect( + host=DB["host"], + port=DB["port"], + user=DB["user"], + password=DB["password"], + database=DB["database"], + charset=DB["charset"] + ) + cur = conn.cursor() + # ZMĚNA: Používáme dictionary=True, což je zde podporováno + cur_dict = conn.cursor(dictionary=True) + except Error as e: + print(f"FATÁLNÍ CHYBA: Nelze se připojit k DB: {e}", flush=True) + return + + # KROK 4: Sestavení SQL dotazu a řádků (Konverze ID na string zůstává pro bezpečnost) + sql = """ + INSERT INTO transactions + (id_operace, cislo_uctu, transaction_date, amount, currency, \ + protiucet, kod_banky, nazev_protiuctu, nazev_banky, typ, \ + vs, ks, ss, uziv_identifikace, zprava_pro_prijemce, \ + provedl, id_pokynu, komentar, upr_objem_mena, api_bic, reference_platce) + VALUES (%(id_operace)s, %(cislo_uctu)s, %(transaction_date)s, %(amount)s, %(currency)s, + %(protiucet)s, %(kod_banky)s, %(nazev_protiuctu)s, %(nazev_banky)s, %(typ)s, + %(vs)s, %(ks)s, %(ss)s, %(uziv_identifikace)s, %(zprava_pro_prijemce)s, + %(provedl)s, %(id_pokynu)s, %(komentar)s, %(upr_objem_mena)s, %(api_bic)s, \ + %(reference_platce)s) ON DUPLICATE KEY \ + UPDATE \ + cislo_uctu = \ + VALUES (cislo_uctu), transaction_date = \ + VALUES (transaction_date), amount = \ + VALUES (amount), currency = \ + VALUES (currency), protiucet = \ + VALUES (protiucet), kod_banky = \ + VALUES (kod_banky), nazev_protiuctu = \ + VALUES (nazev_protiuctu), nazev_banky = \ + VALUES (nazev_banky), typ = \ + VALUES (typ), vs = \ + VALUES (vs), ks = \ + VALUES (ks), ss = \ + VALUES (ss), uziv_identifikace = \ + VALUES (uziv_identifikace), zprava_pro_prijemce = \ + VALUES (zprava_pro_prijemce), provedl = \ + VALUES (provedl), id_pokynu = \ + VALUES (id_pokynu), komentar = \ + VALUES (komentar), upr_objem_mena = \ + VALUES (upr_objem_mena), api_bic = \ + VALUES (api_bic), reference_platce = \ + VALUES (reference_platce) \ + """ + + rows = [] + for t in tlist: + id_operace_val = safe_col(t, 22) + id_pokynu_val = safe_col(t, 17) + + row = { + # Klíčová konverze ID na string (VARCHAR) + "id_operace": str(id_operace_val) if id_operace_val is not None else None, + "cislo_uctu": fio_acc_id, + "transaction_date": clean_date(safe_col(t, 0)), + "amount": safe_col(t, 1), + "currency": safe_col(t, 14), + "typ": safe_col(t, 8), + "provedl": safe_col(t, 9), + + "protiucet": safe_col(t, 2), + "kod_banky": safe_col(t, 3), + "nazev_protiuctu": safe_col(t, 10), + "nazev_banky": safe_col(t, 12), + "api_bic": safe_col(t, 26), + + "vs": safe_col(t, 5), + "ks": safe_col(t, 4), + "ss": safe_col(t, 6), + + "zprava_pro_prijemce": safe_col(t, 16), + "uziv_identifikace": safe_col(t, 7), + "komentar": safe_col(t, 25), + "upr_objem_mena": safe_col(t, 18), + "id_pokynu": str(id_pokynu_val) if id_pokynu_val is not None else None, + "reference_platce": safe_col(t, 27), + } + rows.append(row) + + # KROK 5: Vložení dat do DB + inserted = 0 + try: + # Používáme executemany s připravenými daty + cur.executemany(sql, rows) + conn.commit() + inserted = len(rows) + except Error as e: + print(f"  ❌ Chyba při VKLÁDÁNÍ do DB: {e}", flush=True) + conn.rollback() + + # KROK 6: Kontrola výsledku v DB (používá Dictionary=True) + check_query = f"SELECT count(*) AS count FROM transactions WHERE cislo_uctu = '{fio_acc_id}'" + cur_dict.execute(check_query) + current_db_count = cur_dict.fetchone()['count'] + + conn.close() + + print(f"\n--- SHRNUTÍ TESTU ---", flush=True) + print(f"Očekávalo se vložení/aktualizace řádků: {expected_count}", flush=True) + print(f"Počet řádků zpracovaných skriptem: {inserted}", flush=True) + print(f"Aktuální počet záznamů pro účet {fio_acc_id} v DB: {current_db_count}", flush=True) + print(f"Celkový čas: {time.time() - start_all:.2f} s", flush=True) + + if inserted == expected_count and current_db_count >= expected_count: + print("✅ TEST ÚSPĚŠNÝ: Všechny transakce byly vloženy/aktualizovány, nebo DB obsahuje očekávaný počet.", + flush=True) + else: + print("🔥 TEST SELHAL: Existuje nesoulad mezi očekávaným a skutečným počtem záznamů.", flush=True) + + +# ====================================================== +# ENTRY POINT +# ====================================================== + +if __name__ == "__main__": + run_test() \ No newline at end of file diff --git a/#10 Download reports/accounts.json b/#10 Download reports/accounts.json new file mode 100644 index 0000000..101357a --- /dev/null +++ b/#10 Download reports/accounts.json @@ -0,0 +1,53 @@ +[ + { + "name": "EUR tatínek 1", + "account_number": "2100074583", + "token": "GuV2Boaulx56ZiQUqUArgg6P9qdfEVKOoH6wF3PfAZ0fPS01r2WbiNiCsCcIBZ0U" + }, + { + "name": "CZK rodina", + "account_number": "2100046291", + "token": "v0GJaAVeefzV1lnx1jPCf2nFF7SuOPzzrL5tobPNsC7oCChXG4hahDYVb8Rdcex0" + }, + { + "name": "EUR TrialHelp", + "account_number": "2200787265", + "token": "9yG5g6lHWGS6YU2R2petm5DRYTb9orhJ8VPJ0p7RtTjlIo2vB83ynBlPCMGRIwzy" + }, + { + "name": "CZK tatínek", + "account_number": "2400046293", + "token": "j2qmpvWe4RfKtBTBlhwC1VFED7HJlVAe23iPBH1TWis9htEyYe8fRejcMeSxOLqC" + }, + { + "name": "CHF tatínek", + "account_number": "2402161017", + "token": "aNfK9iu6qIPlugGCR6gvSJ7NXtTkDfVVj8fBz4X1pORuGKf6VXjWin4wrr9WRjSd" + }, + { + "name": "EUR tatínek 2", + "account_number": "2500074582", + "token": "aLsl9ETRUU1IgoYeinAzYWyruIoJvs6UvJKTGRlJcm7HaEc5ojsFdxJizyT9lREO" + }, + { + "name": "CZK TrialHelp", + "account_number": "2900046548", + "token": "pKZVHbFDVsbTa8ryEaVc6A2nyrlb4TbT1tCiimieesHvhKFoJmYBRVjCpnvjiUUK" + }, + { + "name": "CZK maminka svojě věci", + "account_number": "2003310572", + "token": "TkrRvnMK77OSSYdVulNvZcT6ltWcmjqkp3RN5WYwnBpNTuaKCWO1zHKOlDGAiNyv" + }, + { + "name": "CZK na jídlo", + "account_number": "2403310563", + "token": "axRvFxu4VCzsDp5QZXN8LQ0fQUqzV2FEBZrM595x3Rtp10zowRBcGOFs9uNNPb7Q" + }, + { + "name": "CZK ordinace", + "account_number": "2800046620", + "token": "Xzdr3eK7se7ZgeE3JujgeidGb0WrB7mGQ6HSOiBJzWi0kPURYKRpkRKB3ZOpt3rq" + } +] + diff --git a/#10 Download reports/expenses_tabs.json b/#10 Download reports/expenses_tabs.json new file mode 100644 index 0000000..f84bb0d --- /dev/null +++ b/#10 Download reports/expenses_tabs.json @@ -0,0 +1,40 @@ +{ + "tabs": [ + { + "name": "All expenses", + "sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND amount < 0 AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;" + }, + { + "name": "Vakciny Ptacek", + "sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '220205630' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;" + }, + { + "name": "Poliklinika Prosek", + "sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '1387720540' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;" + }, + { + "name": "Card payments", + "sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet IS NULL AND typ = 'Platba kartou' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;" + }, + { + "name": "Vakciny Avenier", + "sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '5050012811' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;" + }, + { + "name": "Socialka", + "sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '1011-7926201' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;" + }, + { + "name": "Zdravotka", + "sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '1112001221' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;" + }, + { + "name": "MEDIPOS", + "sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '14309711' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;" + }, + { + "name": "MEDEVIO", + "sql": "SELECT * FROM transactions WHERE cislo_uctu = '2800046620' AND protiucet = '2701907026' AND transaction_date >= '2024-09-30' ORDER BY transaction_date DESC;" + } + ] +}