diff --git a/import_vzp_pracoviste.py b/import_vzp_pracoviste.py index 70ec27e..663757d 100644 --- a/import_vzp_pracoviste.py +++ b/import_vzp_pracoviste.py @@ -1,13 +1,18 @@ """ Import VZP číselníku pracovišť (soubory *.Lh7) do MySQL tabulky vzp_pracoviste. -Spouštět každý týden po stažení nového souboru do složky Import/. +Před importem automaticky stáhne nejnovější soubor z VZP Point (vyžaduje certifikát). +Použití: python import_vzp_pracoviste.py [--no-download] [soubor.Lh7] """ import csv import glob +import io import os +import re import sys +import zipfile from datetime import date, datetime +from html.parser import HTMLParser # Windows konzole - povol UTF-8 výstup if sys.stdout.encoding != "utf-8": @@ -26,6 +31,10 @@ DB_CONFIG = { IMPORT_DIR = os.path.join(os.path.dirname(__file__), "Import") +VZP_POINT_DOC_URL = "https://point.vzp.cz/Cms/Document" +VZP_CERT_FILE = os.path.join(os.path.dirname(__file__), "MichalkaPublicCertProPython.pfx") +VZP_CERT_PASSWORD = "" # nastav heslo PFX souboru, pokud bylo při exportu zadáno + CREATE_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS vzp_pracoviste ( id INT NOT NULL AUTO_INCREMENT, @@ -65,6 +74,100 @@ def parse_date(s: str) -> date | None: return None +def download_latest_file() -> str | None: + """ + Přihlásí se na VZP Point certifikátem, stáhne nejnovější ICP ZIP, + rozbalí PLP111*.Lh7 do Import/ a vrátí cestu. Při chybě vrátí None. + """ + try: + from requests_pkcs12 import Pkcs12Adapter + import requests + except ImportError: + print("[stahování] Chybí knihovny: pip install requests requests-pkcs12") + return None + + password = VZP_CERT_PASSWORD.encode() if VZP_CERT_PASSWORD else None + + session = requests.Session() + session.mount("https://point.vzp.cz", Pkcs12Adapter( + pkcs12_filename=VZP_CERT_FILE, + pkcs12_password=password, + )) + + # Načti stránku s dokumenty + try: + resp = session.get(VZP_POINT_DOC_URL, timeout=30) + resp.raise_for_status() + except Exception as e: + print(f"[stahování] Chyba při načtení {VZP_POINT_DOC_URL}: {e}") + return None + + # Najdi odkaz na *-icp.zip + class _LinkParser(HTMLParser): + def __init__(self): + super().__init__() + self.icp_links: list[str] = [] + + def handle_starttag(self, tag, attrs): + if tag == "a": + href = dict(attrs).get("href", "") + if re.search(r"-icp\.zip", href, re.IGNORECASE): + self.icp_links.append(href) + + parser = _LinkParser() + parser.feed(resp.text) + + if not parser.icp_links: + print("[stahování] Na stránce VZP Point nebyl nalezen odkaz na *-icp.zip") + return None + + # Vyber nejnovější dle data v názvu (YYMMDDHHMMSS-icp.zip) + zip_href = sorted(parser.icp_links)[-1] + if not zip_href.startswith("http"): + zip_href = "https://point.vzp.cz" + zip_href + + # Zkontroluj, jestli už máme aktuální soubor (podle data v názvu ZIP) + date_match = re.search(r"(\d{6})\d{6}-icp\.zip", zip_href) + zip_date = date_match.group(1) if date_match else "" # YYMMDD + lh7_name = f"PLP111{zip_date[:2]}.Lh7" if zip_date else "PLP111??.Lh7" + dest = os.path.join(IMPORT_DIR, lh7_name) + + # Najdi případný existující soubor pro stejný rok + year_suffix = zip_date[:2] if zip_date else "" + existing = glob.glob(os.path.join(IMPORT_DIR, f"PLP111{year_suffix}.Lh7")) + if existing: + print(f"[stahování] {os.path.basename(existing[0])} již existuje — přeskočeno.") + return existing[0] + + # Stáhni ZIP + print(f"[stahování] {zip_href}") + try: + zip_resp = session.get(zip_href, timeout=60) + zip_resp.raise_for_status() + except Exception as e: + print(f"[stahování] Chyba při stahování ZIP: {e}") + return None + + # Rozbal Lh7 z archivu + try: + with zipfile.ZipFile(io.BytesIO(zip_resp.content)) as zf: + lh7_names = [n for n in zf.namelist() if n.lower().endswith(".lh7")] + if not lh7_names: + print("[stahování] ZIP neobsahuje žádný .Lh7 soubor") + return None + lh7_entry = lh7_names[0] + dest = os.path.join(IMPORT_DIR, os.path.basename(lh7_entry)) + os.makedirs(IMPORT_DIR, exist_ok=True) + with zf.open(lh7_entry) as src, open(dest, "wb") as out: + out.write(src.read()) + except Exception as e: + print(f"[stahování] Chyba při rozbalování ZIP: {e}") + return None + + print(f"[stahování] Rozbaleno: {os.path.basename(dest)} ({os.path.getsize(dest):,} B)") + return dest + + def find_latest_file() -> str: files = glob.glob(os.path.join(IMPORT_DIR, "*.Lh7")) if not files: @@ -140,7 +243,19 @@ def import_file(filepath: str, conn: mysql.connector.MySQLConnection) -> int: def main(): - filepath = sys.argv[1] if len(sys.argv) > 1 else find_latest_file() + args = sys.argv[1:] + no_download = "--no-download" in args + args = [a for a in args if a != "--no-download"] + + if args: + filepath = args[0] + else: + if not no_download: + downloaded = download_latest_file() + if downloaded is None: + print("[stahování] Pokračuji s lokálním souborem...") + filepath = find_latest_file() + filename = os.path.basename(filepath) print(f"Soubor: {filename}")